<a href="https://colab.research.google.com/github/RafaelAnga/Artificial-Intelligence/blob/main/A3C/A3C_For_Frogger_Game.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Problem Context:
Self-learning AI, powered by reinforcement learning, is transforming how we approach complex decision-making tasks—from robotics and autonomous vehicles to game playing and resource optimization. By enabling agents to learn optimal strategies through trial and error, these algorithms can adapt to unpredictable environments and discover solutions that are often unintuitive to humans.




##About This Project:
In this project, I developed an Asynchronous Advantage Actor-Critic (A3C) agent to play the classic game Frogger. The A3C algorithm leverages parallel environments and a combination of policy (actor) and value (critic) networks to efficiently learn challenging tasks. This implementation demonstrates how reinforcement learning can be applied to environments with continuous feedback and high variability, highlighting the strengths of self-learning agents in mastering complex behaviors without explicit programming.

By successfully training an agent to navigate Frogger’s obstacles, this project serves as a step forward in understanding and applying advanced reinforcement learning techniques. It also showcases the versatility of self-learning AI, which can be extended to a wide variety of real-world applications beyond games.

# A3C for Frogger Game



## This code is for the game Frogger from the Gymnasium website, under the Atari category.
This code trains a reinforcement learning AI to play the game and obtain a score of 500.

Website: https://ale.farama.org/environments/frogger/

## Part 0 - Installing the required packages and importing the libraries

### Installing Gymnasium

In [None]:
!pip install gymnasium
!pip install "gymnasium[atari, accept-rom-license]"
!pip install ale-py
!apt-get install -y swig
!pip install gymnasium[box2d]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
swig is already the newest version (4.0.2-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


### Importing the libraries

In [None]:
import cv2
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torch.multiprocessing as mp
import torch.distributions as distributions
from torch.distributions import Categorical
import ale_py
import gymnasium as gym
from gymnasium.spaces import Box
from gymnasium import ObservationWrapper

## Part 1 - Building the AI

### Creating the architecture of the Neural Network

In [None]:
# Define the Network clas
class Network(nn.Module):

    def __init__(self, action_size):
        super(Network, self).__init__()
        self.conv1 = torch.nn.Conv2d(in_channels=4, out_channels=32, kernel_size=(3, 3), stride=2)
        self.conv2 = torch.nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), stride=2)
        self.conv3 = torch.nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), stride=2)
        self.flatten = torch.nn.Flatten()
        self.fc1 = torch.nn.Linear(5184, 128)
        self.fc2a = torch.nn.Linear(128, action_size)
        self.fc2s = torch.nn.Linear(128, 1)

    def forward(self, state):
        x = self.conv1(state)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.flatten(x)
        x = F.relu(self.fc1(x))
        action_values = self.fc2a(x)
        state_value = self.fc2s(x)
        return action_values, state_value


## Part 2 - Training the AI

### Setting up the environment

In [None]:
# Preprocess Atari images
class PreprocessAtari(ObservationWrapper):
    def __init__(self, env, height=84, width=84, crop=lambda img: img, dim_order='pytorch', n_frames=4):
        super(PreprocessAtari, self).__init__(env)
        self.img_size = (height, width)
        self.crop = crop
        self.dim_order = dim_order
        self.frame_stack = n_frames
        n_channels = n_frames
        obs_shape = (n_channels, height, width) if dim_order == 'pytorch' else (height, width, n_channels)
        self.observation_space = Box(low=0.0, high=1.0, shape=obs_shape, dtype=np.float32)
        self.frames = np.zeros(obs_shape, dtype=np.float32)

    def reset(self):
        self.frames.fill(0)
        obs, info = self.env.reset()
        self.update_buffer(obs)
        return self.frames, info

    def observation(self, img):
        img = self.crop(img)
        img = cv2.resize(img, self.img_size)
        img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) / 255.0
        self.frames = np.roll(self.frames, shift=-1, axis=0)
        self.frames[-1] = img
        return self.frames

    def update_buffer(self, obs):
        self.frames = self.observation(obs)

# Create the environment
def make_env():
    env = gym.make("ALE/Frogger-v5", render_mode="rgb_array")
    env = PreprocessAtari(
        env,
        height=84,
        width=84,
        crop=lambda img: img[0:210, :, :],
        dim_order="pytorch",
        n_frames=4
    )
    return env

# Initialize environment and print shape information
env = make_env()
state_shape = env.observation_space.shape
number_actions = env.action_space.n
print("State shape:", state_shape)
print("Number actions:", number_actions)


State shape: (4, 84, 84)
Number actions: 5


### Initializing the hyperparameters

In [None]:
learning_rate = 5e-4
discount_factor = 0.97
number_environments = 10

### Implementing the A3C class

In [None]:
# PPO Agent class
class PPOAgent:
    def __init__(self, action_size, learning_rate=5e-4, gamma=0.97, lamda=0.95, clip_epsilon=0.2):
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.action_size = action_size
        self.gamma = gamma
        self.lamda = lamda
        self.clip_epsilon = clip_epsilon

        self.network = Network(action_size).to(self.device)
        self.optimizer = torch.optim.Adam(self.network.parameters(), lr=learning_rate)

    def act(self, state):
        state = torch.tensor(state, dtype=torch.float32, device=self.device)
        action_values, _ = self.network(state)
        policy = F.softmax(action_values, dim=-1)
        dist = Categorical(policy)
        action = dist.sample()
        return action.item(), dist.log_prob(action), dist.entropy()

    def step(self, states, actions, rewards, next_states, dones, old_log_probs, old_values):
        states = torch.tensor(states, dtype=torch.float32, device=self.device)
        next_states = torch.tensor(next_states, dtype=torch.float32, device=self.device)
        actions = torch.tensor(actions, dtype=torch.int64, device=self.device)
        rewards = torch.tensor(rewards, dtype=torch.float32, device=self.device)
        dones = torch.tensor(dones, dtype=torch.float32, device=self.device)

        action_values, state_values = self.network(states)
        _, next_state_values = self.network(next_states)
        probs = F.softmax(action_values, dim=-1)
        dist = Categorical(probs)
        new_log_probs = dist.log_prob(actions)
        entropy = dist.entropy()

        target_values = rewards + self.gamma * next_state_values * (1 - dones)
        delta = target_values - state_values
        advantage = delta.detach()

        ratio = (new_log_probs - old_log_probs).exp()
        surrogate1 = ratio * advantage
        surrogate2 = torch.clamp(ratio, 1 - self.clip_epsilon, 1 + self.clip_epsilon) * advantage
        actor_loss = -torch.min(surrogate1, surrogate2).mean()

        critic_loss = F.mse_loss(state_values, target_values.detach())

        entropy_loss = -entropy.mean()

        loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy_loss

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()


### Initializing the PPO agent

In [None]:
agent = PPOAgent(number_actions)

### Evaluating our A3C agent on a single episode

In [None]:
# Evaluate the model
def evaluate(agent, env, n_episodes=10):
    episodes_rewards = []
    for _ in range(n_episodes):
        state, _ = env.reset()
        total_reward = 0
        while True:
            action = agent.act(state)
            state, reward, done, info, _ = env.step(action[0])
            total_reward += reward
            if done:
                break
        episodes_rewards.append(total_reward)
    return episodes_rewards

### Testing multiple agents on multiple environments at the same time

In [None]:
# Environment batch for parallel environments
class EnvBatch:
    def __init__(self, n_envs=10):
        self.envs = [make_env() for _ in range(n_envs)]

    def reset(self):
        _states = []
        for env in self.envs:
            _states.append(env.reset()[0])
        return np.array(_states)

    def step(self, actions):
        next_states, rewards, dones, infos, _ = map(np.array, zip(*[env.step(a) for env, a in zip(self.envs, actions)]))
        for i in range(len(self.envs)):
            if dones[i]:
                next_states[i] = self.envs[i].reset()[0]
        return next_states, rewards, dones, infos

### Training the PPO agent

In [None]:
def train_ppo(agent, env_batch, n_steps=2048, batch_size=64):
    batch_states = env_batch.reset()
    episode_rewards = []

    for i in range(n_steps):
        states, actions, rewards, next_states, dones, log_probs, values = [], [], [], [], [], [], []

        for _ in range(batch_size):
            action, log_prob, entropy = agent.act(batch_states)
            next_states, batch_rewards, batch_dones, _ = env_batch.step(action)

            states.append(batch_states)
            actions.append(action)
            rewards.append(batch_rewards)
            next_states.append(next_states)
            dones.append(batch_dones)
            log_probs.append(log_prob)
            values.append(batch_states)

        agent.step(states, actions, rewards, next_states, dones, log_probs, values)

        if i % 1000 == 0:
            print(f"Step {i}: Average Reward: {np.mean(evaluate(agent, env, n_episodes=10))}")


  critic_loss = F.mse_loss(target_state_value.detach(), state_value)
  0%|          | 3/3001 [00:26<5:47:01,  6.95s/it] 

Step 0: Average Reward: 8.7


 33%|███▎      | 1003/3001 [01:57<1:26:35,  2.60s/it]

Step 1000: Average Reward: 10.1


 67%|██████▋   | 2003/3001 [03:27<41:13,  2.48s/it]

Step 2000: Average Reward: 11.7


100%|██████████| 3001/3001 [05:02<00:00,  9.93it/s]

Step 3000: Average Reward: 11.3





### Running the PPO model

In [None]:
env_batch = EnvBatch(number_environments)
train_ppo(agent, env_batch)

## Part 3 - Visualizing the results

In [None]:
# Show video of the trained agent
import glob
import io
import base64
import imageio
from IPython.display import HTML, display

def show_video_of_model(agent, env):
    state, _ = env.reset()
    done = False
    frames = []
    while not done:
        frame = env.render()
        frames.append(frame)
        action = agent.act(state)
        state, reward, done, _, _ = env.step(action[0])
    env.close()
    imageio.mimsave('video.mp4', frames, fps=30)

show_video_of_model(agent, env)

def show_video():
    mp4list = glob.glob('*.mp4')
    if len(mp4list) > 0:
        mp4 = mp4list[0]
        video = io.open(mp4, 'r+b').read()
        encoded = base64.b64encode(video)
        display(HTML(data='''<video alt="test" autoplay
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
    else:
        print("Could not find video")

show_video()

