In [None]:
!pip install gym[atari]
!pip install stable-baselines3[extra]

In [None]:
import os
import matplotlib.pyplot as plt
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_atari_env
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.atari_wrappers import AtariWrapper
from stable_baselines3.common.evaluation import evaluate_policy
import cv2
import gym
import random
import numpy as np
import torch

In [None]:
from google.colab import drive
from google.colab import files

In [None]:
drive.mount('/content/drive')

#Training Stable Baselines 3 DQN on Breakout

In [None]:
from stable_baselines3.common.callbacks import BaseCallback
import matplotlib.pyplot as plt
import cv2
import numpy as np

class CustomCallback(BaseCallback):
    def __init__(self, save_path, screenshot_freq, smoothing_window=50):
        super(CustomCallback, self).__init__()
        self.save_path = save_path
        self.screenshot_freq = screenshot_freq
        self.all_episode_rewards = []
        self.losses = []
        self.q_values = []
        self.smoothing_window = smoothing_window

    def _on_step(self):
        info = self.locals.get("infos", [{}])[-1]
        done = self.locals.get("dones", [False])[-1]

        # Store loss and Q-values
        self.losses.append(info.get('loss', 0))
        self.q_values.append(info.get('mean_q_value', 0))

        # Check if an episode has finished
        if done:
            episode_reward = info.get("episode", {}).get("r", 0)
            self.all_episode_rewards.append(episode_reward)

        # Take a screenshot and print scores every 'screenshot_freq' steps
        if self.num_timesteps % self.screenshot_freq == 0:
            img = self.training_env.render(mode='rgb_array')
            cv2.imwrite(f"{self.save_path}/screenshot_{self.num_timesteps}.png", img)

            # Calculate and print current max and average score
            if self.all_episode_rewards:
                max_score = max(self.all_episode_rewards)
                avg_score = sum(self.all_episode_rewards) / len(self.all_episode_rewards)
                print(f"Step: {self.num_timesteps}, Max Score: {max_score}, Average Score: {avg_score}")

        return True


    def smooth(self, data):
        # Apply a moving average for smoothing
        return np.convolve(data, np.ones(self.smoothing_window)/self.smoothing_window, mode='valid')

    def plot_metrics(self):
        # Plotting the episode rewards, loss, and Q-values after training
        plt.figure(figsize=(15, 5))

        # Rewards
        plt.subplot(1, 3, 1)
        plt.plot(self.smooth(self.all_episode_rewards), label='Smoothed Episode Rewards')
        plt.xlabel('Episodes')
        plt.ylabel('Rewards')
        plt.title('Smoothed Rewards per Episode')

        # Loss
        plt.subplot(1, 3, 2)
        plt.plot(self.smooth(self.losses), label='Smoothed Loss')
        plt.xlabel('Steps')
        plt.ylabel('Loss')
        plt.title('Smoothed Loss over Time')

        # Q-Values
        plt.subplot(1, 3, 3)
        plt.plot(self.smooth(self.q_values), label='Smoothed Q-Values')
        plt.xlabel('Steps')
        plt.ylabel('Q-Values')
        plt.title('Smoothed Q-Values over Time')

        plt.tight_layout()
        plt.legend()
        plt.savefig(f"{self.save_path}/training_metrics.png")
        plt.close()

In [None]:
model_path_cnn = 'path_to_CNN model'
save_path = 'path to save screenshots to'
callback = CustomCallback(save_path=save_path, screenshot_freq=50000)

# Create and wrap the environment
env_cnn = make_atari_env('BreakoutNoFrameskip-v4', n_envs=1, seed=0)
env_cnn = VecFrameStack(env_cnn, n_stack=4)


In [None]:
for i in range(20):
    loaded_model = DQN.load(model_path_cnn, env=env_cnn)
    loaded_model.learn(total_timesteps=500000, callback=callback)
    loaded_model.save(model_path_cnn)

    if i>0 and i % 2 == 0: callback.plot_metrics()

#Evaluate DQN Agent

#Experiments on Gamma

Switch model by loading in different gamma value

In [None]:
loaded_model = DQN.load(model_path_cnn, env=env_cnn, gamma=0.9)

In [None]:
loaded_model = DQN.load(model_path_cnn, env=env_cnn, gamma=0.95)

In [None]:
loaded_model = DQN.load(model_path_cnn, env=env_cnn, gamma=0.99)

In [None]:
# Evaluate the policy and get individual episode rewards
episode_rewards, episode_lengths = evaluate_policy(loaded_model, env_cnn, n_eval_episodes=10, render=False, return_episode_rewards=True)

# Calculate mean and standard deviation
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)
print(np.max(episode_rewards))
print(f"Environment: Breakout, Mean Reward: {mean_reward}, Std: {std_reward}")

env_cnn.close()

#Experiments on Environments

Breakout Environment

In [None]:
loaded_model = DQN.load(model_path_cnn, env=env_cnn)

In [None]:
# Create and wrap the environment
env_cnn = make_atari_env('BreakoutNoFrameskip-v4', n_envs=1)
env_cnn = VecFrameStack(env_cnn, n_stack=4)

In [None]:
# Evaluate the policy and get individual episode rewards
episode_rewards, episode_lengths = evaluate_policy(loaded_model, env_cnn, n_eval_episodes=10, render=False, return_episode_rewards=True)

# Calculate mean and standard deviation
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)
print(np.max(episode_rewards))
print(f"Environment: Breakout, Mean Reward: {mean_reward}, Std: {std_reward}")

env_cnn.close()

Pong Environment

In [None]:
env_pong = make_atari_env('PongNoFrameskip-v4', n_envs=1, seed=0)
env_pong = VecFrameStack(env_pong, n_stack=4)

In [None]:
loaded_model = DQN.load(model_path_cnn)

In [None]:
# Evaluate the policy and get individual episode rewards
episode_rewards, episode_lengths = evaluate_policy(loaded_model, env_pong, n_eval_episodes=10, render=False, return_episode_rewards=True)

# Calculate mean and standard deviation
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)

print(f"Environment: Pong, Mean Reward: {mean_reward}, Std: {std_reward}")

env_pong.close()

Space Invaders Environment

In [None]:
env_SI = make_atari_env('SpaceInvadersNoFrameskip-v4', n_envs=1, seed=0)
env_SI = VecFrameStack(env_SI, n_stack=3) #Paper mentions this, but to see lasers, we need to lower to stack 3 frames instead of 4

In [None]:
loaded_model = DQN.load(model_path_cnn)

In [None]:
# Evaluate the policy and get individual episode rewards
episode_rewards, episode_lengths = evaluate_policy(loaded_model, env_SI, n_eval_episodes=10, render=False, return_episode_rewards=True)

# Calculate mean and standard deviation
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)

print(f"Environment: Space Invaders, Mean Reward: {mean_reward}, Std: {std_reward}")

env_SI.close()