In [1]:
!pip install gymnasium
!pip install stable-baselines3[extra]

Collecting gymnasium
  Downloading gymnasium-0.29.0-py3-none-any.whl (953 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m953.8/953.8 kB[0m [31m7.4 MB/s[0m eta [36m0:00:00[0m
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-0.29.0
Collecting stable-baselines3[extra]
  Downloading stable_baselines3-2.0.0-py3-none-any.whl (178 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m178.4/178.4 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gymnasium==0.28.1 (from stable-baselines3[extra])
  Downloading gymnasium-0.28.1-py3-none-any.whl (925 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m925.5/925.5 kB[0m [31m8.3 MB/s[0m eta [36m0:00:00[0m
Collecting shimmy[atari]~=0.2.1 (from stable-baselines3[extra])
  Downloading Shim

# importing the libraries

In [None]:
import os
import gymnasium as gym
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback,StopTrainingOnRewardThreshold
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.dqn.policies import CnnPolicy
from gymnasium.utils.save_video import save_video
from gymnasium.wrappers import FrameStack,  ResizeObservation
from PIL import Image
import warnings
warnings.filterwarnings('ignore')


# CartPole Agent

In [None]:
class CartPoleDQNAgent:
    def __init__(self, name=None, env_name=None,eval_freq=20000, buffer_size=1000):
        self.name = name # name of the game
        self.env_name = env_name # environment name
        self.policy = "MlpPolicy" # policy
        self.eval_freq = eval_freq # evaluation frequency
        self.buffer_size = buffer_size # buffer size for the replay buffer
        self.log_path = os.path.join('/content/drive/MyDrive/Colab_Notebooks/my_atari_games/Training/DQN_' + self.name + '_Log') # path for loging the training  data
        self.save_path = os.path.join('/content/drive/MyDrive/Colab_Notebooks/my_atari_games/Saved_Models/DQN_' + self.name +'_Model') # path for saving the trained model
        self.env = self.make_environment() #function that creates the environment and agent.
        self.model = self._build_dqn() #function that builds the DQN model.

    def make_environment(self): # A call to the function that creates the environment
        env = gym.make(self.env_name, render_mode="rgb_array") # creates the environment and agent
        env = DummyVecEnv([lambda: env]) #creates a vectorized dummy environment
        return env # returns the created  environment.

    def _build_dqn(self): # A call to the function that builds the DQN model
        model = DQN(self.policy, self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size) # creates the DQN model
        return model # returns the created DQN model


    def _play_one_episode(self): # A call to the function that plays one episode
          obs = self.env.reset() # resets the environment
          done = False # sets the done flag
          score = 0 # sets the score to zero

          while not done: # loops until the done flag is set
              action = self.env.action_space.sample() # selects an action from a sample space
              obs, reward, done, *info= self.env.step([action]) # takes the action and returns the observation, reward, done, and info
              score += reward # Updates the score

          return score # returns the score value


    def play_episodes(self, num_episodes=10, play_type ="random"): # A call to the function that plays episodes
        if play_type == "random": # if the play type is random
          print(f"Playing the {self.name} game randomly for {num_episodes} episodes") # prints the message
          scores = [self._play_one_episode() for _ in range(num_episodes)] # creates a list of scores
          for episode, score in enumerate(scores, 1): # loops through the list of scores
            print(f"Episode {episode}: {score[0]}") # prints the score

        if play_type == "predict": # if the play type is predict
          episode_rewards = [] # creates a list of episode rewards
          frames = [] # creates a list of frames for the images

          for episode in range(num_episodes): # loops through the number of episodes
              obs = self.env.reset() # resets the environment
              done = False # sets the done flag
              score = 0 # sets the score to zero

              while not done: # loops until the done flag is set
                  action, _ = self.model.predict(obs) # predicts the action to take from the observation
                  obs, reward, done, *info= self.env.step(action) # takes the action and returns the observation, reward, done, and info
                  score += reward # Updates the score
                  frame = Image.fromarray(self.env.render()) #Craptures the frame of image  from the environment
                  frame = np.array(frame) # converts the frame to numpy
                  frames.append(frame) # adds the frame to the list

              episode_rewards.append(score) # adds the score to the list

              print(f"Episode {episode+1}: {score[0]}") # prints the score

          video_path =  os.path.join(self.save_path, self.name + "_Agent_play") # video path


          save_video(frames, video_path, fps= 30, name_prefix =f"{self.name}-agent-play") # saves the video


    def train(self, time_steps=None, stop_value=None): # A call to the function that trains the agent
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0) # creates the stop callback, assigns the reward threshold so training can stop
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback,
                                     eval_freq=self.eval_freq, best_model_save_path=self.save_path) # creates the eval callback, checks if the reward has been achieved
        self.model.learn(total_timesteps=time_steps, callback=eval_callback) # trains the model


    def evaluate_policy(self, episodes=None): # A call to the function that evaluates the policy
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes) # evaluates the policy
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}") # prints the mean reward and standard deviation

    def close_env(self): # A call to the function that closes the environment
        self.env.close() # closes the environment




In [None]:
#create the agent and create the environment
CartPole_agent = CartPoleDQNAgent(name="CartPole", env_name="CartPole-v1")

In [None]:
#Play the cart pole game randomly for 20 episodes
CartPole_agent.play_episodes(num_episodes=20)

In [None]:
#test out the agent with the cart pole game
CartPole_agent.train(time_steps=1000000, stop_value=500)

In [None]:
#test out the agent with the cart pole game
CartPole_agent.play_episodes(num_episodes=10, play_type="predict")

In [None]:
#close the environment
CartPole_agent.close_env()

# DQNAgent for SpaceInvaders and Pac-Man

In [5]:

class DQNAgent: # A class that creates the DQN model
    def __init__(self, name=None, env_name=None,eval_freq=20000, buffer_size=1000):
        self.name = name # name of the game
        self.env_name = env_name# environment name
        # self.policy = "MultiInputPolicy" # policy
        self.eval_freq = eval_freq # evaluation frequency
        self.buffer_size = buffer_size # buffer size for the replay buffer
        self.log_path = os.path.join('/content/drive/MyDrive/Colab_Notebooks/my_atari_games/Training/DQN_' + self.name + '_Log') # path for loging the training  data
        self.save_path = os.path.join('/content/drive/MyDrive/Colab_Notebooks/my_atari_games/Saved_Models/DQN_' + self.name +'_Model') # path for saving the trained model
        self.env = self.make_environment() #function that creates the environment
        self.model = self._build_dqn() #function that builds the DQN model

    def make_environment(self): # A call to the function that creates the environment
        env = gym.make(self.env_name, render_mode="rgb_array") # creates the environment and agent
        env =  ResizeObservation(env,84) #Resize the observation
        # env = FrameStack(env, num_stack=4) # stacks the frames
        return env # returns the created  environment.

    def _build_dqn(self): # A call to the function that builds the DQN model
        model = DQN(CnnPolicy, self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size) # creates the DQN model
        return model # returns the created DQN model

    def _play_one_episode(self): # A call to the function that plays one episode
        obs, _ = self.env.reset() # resets the environment
        done = False # sets the done flag
        score = 0 # sets the score to zero

        while not done: # loops until the done flag is set
            action= self.env.action_space.sample()  # selects an action from a sample space randomly
            obs, reward, done, *info = self.env.step(action) # takes the action and returns the observation, reward, done, and info
            score += reward # Updates the score

        return score # returns the score value


    def play_episodes(self, num_episodes=10, play_type ="random"): # A call to the function that plays episodes
        if play_type == "random": # if the play type is random
          print(f"Playing the {self.name} game randomly for {num_episodes} episodes") # prints the message
          scores = [self._play_one_episode() for _ in range(num_episodes)] # creates a list of scores
          for episode, score in enumerate(scores, 1): # loops through the list of scores
            print(f"Episode {episode}: {score}") # prints the score

        if play_type == "predict": # if the play type is predict
          episode_rewards = [] # creates a list of episode rewards
          frames = [] # creates a list of frames for the images

          for episode in range(num_episodes): # loops through the number of episodes
              obs, _ = self.env.reset() # resets the environment
              done = False # sets the done flag
              score = 0 # sets the score to zero

              while not done: # loops until the done flag is set
                  action, _ = self.model.predict(obs) # predicts the action to take from the observation
                  obs, reward, done, *info= self.env.step(action) # takes the action and returns the observation, reward, done, and info
                  score += reward # Updates the score
                  frame = Image.fromarray(self.env.render()) #Craptures the frame of image  from the environment
                  frame = np.array(frame) # converts the frame to numpy
                  frames.append(frame)# adds the frame to the list

              episode_rewards.append(score) # adds the score to the list

              print(f"Episode {episode+1}: {score}")# prints the score

          video_path =  os.path.join(self.save_path, self.name + "_Agent_play") # video path


          save_video(frames, video_path, fps=30, name_prefix =f"{self.name}-agent-play") # saves the video


    def train(self, time_steps=None, stop_value=None): # A call to the function that trains the agent
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0) # creates the stop callback, assigns the reward threshold so training can stop
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback,
                                     eval_freq=self.eval_freq, best_model_save_path=self.save_path) # creates the eval callback, checks if the reward has been achieved
        self.model.learn(total_timesteps=time_steps, callback=eval_callback) # trains the model

    def evaluate_policy(self, episodes=None): # A call to the function that evaluates the policy
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes) # evaluates the policy
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}") # prints the mean reward and standard deviation

    def load_best_model(self):
        best_model = DQN.load(self.save_path + "/best_model")
        return best_model

    def save_model(self):
        return self.model.save(self.save_path)

    def close_env(self): # A call to the function that closes the environment
        self.env.close() # closes the environment




# SpaceInvaders

In [11]:
#initialize the agent and create the environment
SpaceInvaders_agent = DQNAgent(name="SpaceInvaders", env_name="SpaceInvaders-v4")

In [None]:
#Play the space invaders game randomly for 20 episodes
SpaceInvaders_agent.play_episodes(num_episodes=20)

In [16]:
#train the agent
SpaceInvaders_agent.train(time_steps=1000000, stop_value=1000)

In [14]:
SpaceInvaders_agent.evaluate_policy(episodes=10)

Mean reward over 10 episodes is 205.0 with a standard deviation of 90.69178573608527


In [None]:
# test out the agent with the space invaders game
SpaceInvaders_agent.play_episodes(num_episodes=10, play_type="predict")

In [None]:
#Close the environment
SpaceInvaders_agent.close_env()

# Pacman

In [None]:
#initialize the agent and create the environment
Pacman_agent_agent = DQNAgent(name="Pacman", env_name="MsPacman-v4")

In [None]:
#Play the pacman game randomly for 20 episodes
Pacman_agent_agent.play_episodes(num_episodes=20)

In [None]:
#train the agent
Pacman_agent_agent.train(time_steps=1000000, stop_value=1000)

In [None]:
#evaluate the policy used by the agent
Pacman_agent_agent.evaluate_policy(episodes=10)

In [None]:
Pacman_agent_agent.save_model()

In [None]:
# test out the agent with the pacman game
Pacman_agent_agent.play_episodes(num_episodes=10, play_type="predict")

In [None]:
#Close the environment
Pacman_agent_agent.close_env()