In [None]:
# Set Up:
import sys
USING_COLAB = 'google.colab' in sys.modules

if USING_COLAB:
    !apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
    !pip install -U renderlab
    !pip install -U colabgymrender
    !pip install -U moviepy==0.2.3.5
    !pip install imageio==2.4.1
    !pip install --upgrade AutoROM
    !AutoROM --accept-license
    !pip install stable_baselines3  
    !pip install "gymnasium[atari, accept-rom-licesnse]"

import numpy as np
import ale_py
import shimmy
import gymnasium as gym
import renderlab as rl
import random
import matplotlib.pyplot as plt
from colabgymrender.recorder import Recorder
from copy import deepcopy

from torch.utils.data import DataLoader
from torch import nn
import torch

from tqdm import tqdm, trange

seed = 24
data_seed = 700



In [None]:

print('gym:', gym.__version__)
print('ale_py:', ale_py.__version__)
print("GPU is available:", torch.cuda.is_available())

In [None]:
print(torch.version.cuda)
print(torch.__version__)

# Functions from A5 starter code

In [None]:
  # Setting the seed to ensure reproducability
def reseed(seed, env=None):
    '''
        Sets the seed for reproducibility

        When @param env is provided, also sets the
        random number generataor of the gym environment
        to this particular seed
    '''
    torch.manual_seed(seed)
    random.seed(seed)
    np.random.seed(seed)

    if env is not None:
        env.unwrapped._np_random = gym.utils.seeding.np_random(seed)[0]

reseed(seed)

In [None]:
def visualize(env_name='ALE/SpaceInvaders-v5', algorithm=None, video_name="test", env_args={}):
    """Visualize a policy network for a given algorithm on a single episode

        Args:
            env_name: Name of the gym environment to roll out `algorithm` in, it will be instantiated using gym.make or make_vec_env
            algorithm (PPOActor): Actor whose policy network will be rolled out for the episode. If
            no algorithm is passed in, a random policy will be visualized.
            video_name (str): Name for the mp4 file of the episode that will be saved (omit .mp4). Only used
            when running on local machine.
    """

    def get_action(obs):
        if not algorithm:
            return env.action_space.sample()
        else:
            return algorithm.predict(obs)[0]

    if USING_COLAB:
        from renderlab import RenderFrame

        directory = './video'
        env_args['render_mode'] = 'rgb_array'
        env = gym.make(env_name, **env_args)
        env = RenderFrame(env, directory)
        obs, info = env.reset()

        while True:
            action = get_action(obs)
            obs, reward, done, truncate, info = env.step(action)

            if done:
                break

        env.play()
    else:
        import cv2
        # need to specify dimensions correctly (use the print statement)
        video = cv2.VideoWriter(f"videos/{video_name}.mp4", cv2.VideoWriter_fourcc(*'mp4v'), 24, (160,210))

        env_args['render_mode'] = 'rgb_array'
        env = gym.make(env_name, **env_args)
        env.metadata['render_fps'] = 60
        obs, info = env.reset()
        #img = plt.imshow(env.render()) # only call this once
        while True:
            #img.set_data(env.render()) # just update the data
            #display.display(plt.gcf())
            #display.clear_output(wait=True)
            action = get_action(obs)
            res = env.step(action)
            obs, reward, done, truncate, info = res
            if done or truncate:
                #print("DONE")
                break
            im = env.render()
            im = im[:,:,::-1]
            #print(im.shape)

            video.write(im)

        video.release()
        env.close()
        print(f"Video saved as {video_name}.mp4")

In [None]:
def evaluate_policy(actor, environment, num_episodes=100, progress=True):
    '''
        Returns the mean trajectory reward of rolling out `actor` on `environment

        Parameters
        - actor: PPOActor instance, defined in Part 1
        - environment: classstable_baselines3.common.vec_env.VecEnv instance
        - num_episodes: total number of trajectories to collect and average over
    '''
    total_rew = 0

    iterate = (trange(num_episodes) if progress else range(num_episodes))
    for _ in iterate:
        obs = environment.reset()
        done = False

        while not done:
            action = actor.predict(obs)[0]

            next_obs, reward, done, info = environment.step(action)
            total_rew += reward

            obs = next_obs

    return (total_rew / num_episodes).item()


In [None]:
# for testing if visualization works (random actions selected)
# env arguments are space invader specific
# see https://ale.farama.org/environments/#2 for description of params
# we use frameskip for training but for visualization we don't use frameskip so we can see what's happening clearly
# for dqn we evaluate on same environment as testing
env_id = "ALE/SpaceInvaders-v5"
visualize_env_args = {
    'mode':0,
    'difficulty':1,
    'obs_type':"rgb",
    'full_action_space':False,
    'frameskip':1
    }
train_env_args = {
    'mode':0,
    'difficulty':1,
    'obs_type':"rgb",
    'full_action_space':False,
    }
visualize(env_id, env_args=visualize_env_args)

# Initialization
from A5 starter code

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.vec_env.base_vec_env import VecEnv
from stable_baselines3.common.env_checker import check_env # for regular envs
from stable_baselines3.common.env_util import make_vec_env
# not using make_atari_env bc that wraps it in an atari wrapper only for v4 stuff

In [None]:
env1 = make_vec_env(env_id, n_envs=1, env_kwargs=train_env_args)

In [None]:
num_envs = 8
env8 = make_vec_env(env_id, n_envs=num_envs, env_kwargs=train_env_args)

In [None]:
eval_callback = EvalCallback(
    eval_env=env1,
    callback_on_new_best=None,
    callback_after_eval=None,
    n_eval_episodes=20,
    eval_freq=100_000,
    log_path='./eval_logs/',
    best_model_save_path='./best_model/',
    deterministic=True,
    render=False,
    verbose=1,
    warn=True
)

# PPO Model

In [None]:
ppomodel = PPO(
    "CnnPolicy",
    env8,
    learning_rate=2.5e-4,
    n_steps=64,
    batch_size=512,
    n_epochs=4,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.1,
    ent_coef=0.01,
    vf_coef=0.5,
    max_grad_norm=0.5,
)

## Training PPO model

In [None]:
reseed(seed)
#ckpt_path = 'space_invaders'
# If you get the error that Only one live display may be active at once, either restart colab or set progress bar = False
total_timesteps = 5_000_000
ppomodel.learn(total_timesteps=total_timesteps, callback=eval_callback, progress_bar=True)

In [None]:
print(evaluate_policy(ppomodel, env1))

In [None]:
visualize('ALE/SpaceInvaders-v5', algorithm=ppomodel, video_name='ppoActor', env_args=visualize_env_args)

In [None]:
ppomodel.save("models/ppo_space_invaders_5m")

# DQN Model

Needs to train on a different env with different params due to dqn buffer memory size

In [None]:
from stable_baselines3 import DQN

# can only use env1 instead of env8 for training due to memory constraints

dqnmodel = DQN(
    "CnnPolicy",  # Using CNN policy for visual inputs
    env1,
    learning_rate=1e-4,
    buffer_size=29_000,            # Smaller replay buffer
    learning_starts=10_000,        # Start learning earlier
    batch_size=32,
    train_freq=2,                  # Train more frequently
    target_update_interval=5_000,  # Update target network more frequently
    exploration_fraction=0.1,     # Faster exploration decay
    exploration_final_eps=0.01,
    gamma=0.99,
)

In [None]:
eval_callback_dqn = EvalCallback(
    eval_env=env1,
    callback_on_new_best=None,
    callback_after_eval=None,
    n_eval_episodes=20,
    eval_freq=100_000,
    log_path='./dqn_eval_logs/',
    best_model_save_path='./dqn_best_model/',
    deterministic=True,
    render=False,
    verbose=1,
    warn=True
)

In [None]:
reseed(seed)

# If you get the error that Only one live display may be active at once, either restart colab or set progress bar = False
total_timesteps = 5_000_000
dqnmodel.learn(total_timesteps=total_timesteps, callback=eval_callback_dqn, progress_bar=True)

In [None]:
print(evaluate_policy(dqnmodel, env1))

In [None]:
visualize('ALE/SpaceInvaders-v5', algorithm=dqnmodel, video_name='dqnActor', env_args=visualize_env_args)