In [1]:
#pip install gymnasium
#pip install stable-baselines3[extra]

# importing the libraries

In [2]:
import os
import gymnasium as gym
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.dqn.policies import CnnPolicy
from gymnasium.wrappers import FrameStack, ResizeObservation
from gymnasium.utils.save_video import save_video
from PIL import Image
import warnings

warnings.filterwarnings('ignore')


# CartPole Agent

In [3]:
class CartPoleDQNAgent:
    def __init__(self, name=None, env_name=None, eval_freq=20000, buffer_size=1000):
        self.name = name
        self.env_name = env_name
        self.policy = "MlpPolicy"
        self.eval_freq = eval_freq
        self.buffer_size = buffer_size
        self.log_path = os.path.join('Training/DQN_' + self.name + '_Log')
        self.save_path = os.path.join('Saved_Models/DQN_' + self.name + '_Model')
        self.env = self.make_environment()
        self.model = self._build_dqn()

    def make_environment(self):
        env = gym.make(self.env_name, render_mode="rgb_array")
        env = DummyVecEnv([lambda: env])
        return env

    def _build_dqn(self):
        model = DQN(policy=self.policy, env=self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size)
        return model

    def _play_one_episode(self):
        obs = self.env.reset()
        done = False
        score = 0

        while not done:
            action = self.env.action_space.sample()
            obs, reward, done, _ = self.env.step([action])
            score += reward

        return score

    def play_episodes(self, num_episodes=10, play_type="random"):
        if play_type == "random":
            print(f"Playing the {self.name} game randomly for {num_episodes} episodes")
            scores = [self._play_one_episode() for _ in range(num_episodes)]
            for episode, score in enumerate(scores, 1):
                print(f"Episode {episode}: {score}")

        if play_type == "predict":
            episode_rewards = []
            frames = []

            for episode in range(num_episodes):
                obs = self.env.reset()
                done = False
                score = 0

                while not done:
                    action, _ = self.model.predict(obs)
                    obs, reward, done, *info = self.env.step(action)
                    score += reward
                    frame = Image.fromarray(self.env.render())
                    frame = np.array(frame)
                    frames.append(frame)

                episode_rewards.append(score)

                print(f"Episode {episode+1}: {score}")

            video_path = os.path.join(self.save_path, self.name + "_Agent_play")

            save_video(frames, video_path, fps=30, name_prefix=f"{self.name}-agent-play")

    def train(self, time_steps=None, stop_value=None):
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0)
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback, eval_freq=self.eval_freq, best_model_save_path=self.save_path)
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)

    def evaluate_policy(self, episodes=None):
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")

    def close_env(self):
        self.env.close()

In [4]:
#create the agent and create the environment
CartPole_agent = CartPoleDQNAgent(name="CartPole", env_name="CartPole-v1")

In [5]:
#Play the cart pole game randomly for 20 episodes
CartPole_agent.play_episodes(num_episodes=20)

Playing the CartPole game randomly for 20 episodes
Episode 1: [15.]
Episode 2: [26.]
Episode 3: [9.]
Episode 4: [17.]
Episode 5: [20.]
Episode 6: [15.]
Episode 7: [40.]
Episode 8: [36.]
Episode 9: [14.]
Episode 10: [18.]
Episode 11: [9.]
Episode 12: [10.]
Episode 13: [42.]
Episode 14: [20.]
Episode 15: [15.]
Episode 16: [22.]
Episode 17: [35.]
Episode 18: [16.]
Episode 19: [22.]
Episode 20: [44.]


In [6]:
#test out the agent with the cart pole game
CartPole_agent.train(time_steps=200000, stop_value=500)

Eval num_timesteps=20000, episode_reward=28.80 +/- 9.50
Episode length: 28.80 +/- 9.50
New best mean reward!
Eval num_timesteps=40000, episode_reward=23.20 +/- 3.31
Episode length: 23.20 +/- 3.31
Eval num_timesteps=60000, episode_reward=9.80 +/- 1.17
Episode length: 9.80 +/- 1.17
Eval num_timesteps=80000, episode_reward=9.40 +/- 0.49
Episode length: 9.40 +/- 0.49
Eval num_timesteps=100000, episode_reward=13.20 +/- 1.60
Episode length: 13.20 +/- 1.60
Eval num_timesteps=120000, episode_reward=14.40 +/- 1.62
Episode length: 14.40 +/- 1.62
Eval num_timesteps=140000, episode_reward=171.20 +/- 5.60
Episode length: 171.20 +/- 5.60
New best mean reward!
Eval num_timesteps=160000, episode_reward=151.00 +/- 21.91
Episode length: 151.00 +/- 21.91
Eval num_timesteps=180000, episode_reward=21.20 +/- 3.49
Episode length: 21.20 +/- 3.49
Eval num_timesteps=200000, episode_reward=68.40 +/- 2.33
Episode length: 68.40 +/- 2.33


In [7]:
#test out the agent with the cart pole game
CartPole_agent.play_episodes(num_episodes=10, play_type="predict")

Episode 1: [66.]
Episode 2: [73.]
Episode 3: [68.]
Episode 4: [69.]
Episode 5: [62.]
Episode 6: [70.]
Episode 7: [65.]
Episode 8: [66.]
Episode 9: [73.]
Episode 10: [67.]
Moviepy - Building video c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_CartPole_Model\CartPole_Agent_play/CartPole-agent-play-episode-0.mp4.
Moviepy - Writing video c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_CartPole_Model\CartPole_Agent_play/CartPole-agent-play-episode-0.mp4



                                                               

Moviepy - Done !
Moviepy - video ready c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_CartPole_Model\CartPole_Agent_play/CartPole-agent-play-episode-0.mp4


In [8]:
#close the environment
CartPole_agent.close_env()

# DQNAgent for SpaceInvaders and Pac-Man

In [9]:
class DQNAgent:
    def __init__(self, name=None, env_name=None, eval_freq=20000, buffer_size=1000):
        self.name = name
        self.env_name = env_name
        self.eval_freq = eval_freq
        self.buffer_size = buffer_size
        self.log_path = os.path.join('Training/DQN_' + self.name + '_Log')
        self.save_path = os.path.join('Saved_Models/DQN_' + self.name + '_Model')
        self.env = self.make_environment()
        self.model = self._build_dqn()

    def make_environment(self):
        env = gym.make(self.env_name, render_mode="rgb_array")
        env = ResizeObservation(env, 84)
        return env

    def _build_dqn(self):
        model = DQN(CnnPolicy, self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size)
        return model

    def _play_one_episode(self):
        obs, _ = self.env.reset()
        done = False
        score = 0

        while not done:
            action = self.env.action_space.sample()
            obs, reward, done, *info = self.env.step(action)
            score += reward

        return score

    def play_episodes(self, num_episodes=10, play_type="random"):
        if play_type == "random":
            print(f"Playing the {self.name} game randomly for {num_episodes} episodes")
            scores = [self._play_one_episode() for _ in range(num_episodes)]
            for episode, score in enumerate(scores, 1):
                print(f"Episode {episode}: {score}")

        if play_type == "predict":
            episode_rewards = []
            frames = []

            for episode in range(num_episodes):
                obs, _ = self.env.reset()
                done = False
                score = 0

                while not done:
                    action, _ = self.model.predict(obs)
                    obs, reward, done, *info = self.env.step(action)
                    score += reward
                    frame = Image.fromarray(self.env.render())
                    frame = np.array(frame)
                    frames.append(frame)

                episode_rewards.append(score)

                print(f"Episode {episode+1}: {score}")

            video_path = os.path.join(self.save_path, self.name + "_Agent_play")

            save_video(frames, video_path, fps=30, name_prefix=f"{self.name}-agent-play")

    def train(self, time_steps=None, stop_value=None):
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0)
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback, eval_freq=self.eval_freq, best_model_save_path=self.save_path)
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)

    def evaluate_policy(self, episodes=None):
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")

    def load_best_model(self):
        best_model = DQN.load(self.save_path + "/best_model")
        return best_model

    def save_model(self):
        return self.model.save(self.save_path)

    def close_env(self):
        self.env.close()

# SpaceInvaders

In [10]:
#initialize the agent and create the environment
SpaceInvaders_agent = DQNAgent(name="SpaceInvaders", env_name="SpaceInvaders-v4")

In [11]:
#Play the space invaders game randomly for 20 episodes
SpaceInvaders_agent.play_episodes(num_episodes=20)

Playing the SpaceInvaders game randomly for 20 episodes
Episode 1: 10.0
Episode 2: 15.0
Episode 3: 55.0
Episode 4: 100.0
Episode 5: 90.0
Episode 6: 130.0
Episode 7: 155.0
Episode 8: 215.0
Episode 9: 30.0
Episode 10: 190.0
Episode 11: 90.0
Episode 12: 30.0
Episode 13: 100.0
Episode 14: 185.0
Episode 15: 490.0
Episode 16: 385.0
Episode 17: 60.0
Episode 18: 100.0
Episode 19: 105.0
Episode 20: 30.0


In [13]:
#train the agent
SpaceInvaders_agent.train(time_steps=1000000, stop_value=1000)

Eval num_timesteps=20000, episode_reward=139.00 +/- 35.55
Episode length: 792.20 +/- 35.24
New best mean reward!
Eval num_timesteps=40000, episode_reward=178.00 +/- 118.98
Episode length: 863.80 +/- 185.14
New best mean reward!
Eval num_timesteps=60000, episode_reward=244.00 +/- 77.87
Episode length: 980.40 +/- 301.04
New best mean reward!
Eval num_timesteps=80000, episode_reward=120.00 +/- 66.63
Episode length: 678.80 +/- 207.70
Eval num_timesteps=100000, episode_reward=153.00 +/- 72.77
Episode length: 788.80 +/- 307.79
Eval num_timesteps=120000, episode_reward=161.00 +/- 29.56
Episode length: 753.60 +/- 41.58
Eval num_timesteps=140000, episode_reward=367.00 +/- 173.74
Episode length: 835.20 +/- 200.02
New best mean reward!
Eval num_timesteps=160000, episode_reward=161.00 +/- 110.74
Episode length: 752.00 +/- 146.21
Eval num_timesteps=180000, episode_reward=73.00 +/- 38.94
Episode length: 592.40 +/- 80.62
Eval num_timesteps=200000, episode_reward=87.00 +/- 88.58
Episode length: 621.80

In [14]:
SpaceInvaders_agent.evaluate_policy(episodes=10)

Mean reward over 10 episodes is 236.5 with a standard deviation of 159.42161083115425


In [15]:
# test out the agent with the space invaders game
SpaceInvaders_agent.play_episodes(num_episodes=10, play_type="predict")

Episode 1: 265.0
Episode 2: 235.0
Episode 3: 205.0
Episode 4: 235.0
Episode 5: 140.0
Episode 6: 170.0
Episode 7: 85.0
Episode 8: 185.0
Episode 9: 90.0
Episode 10: 210.0
Moviepy - Building video c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_SpaceInvaders_Model\SpaceInvaders_Agent_play/SpaceInvaders-agent-play-episode-0.mp4.
Moviepy - Writing video c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_SpaceInvaders_Model\SpaceInvaders_Agent_play/SpaceInvaders-agent-play-episode-0.mp4



                                                                 

Moviepy - Done !
Moviepy - video ready c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_SpaceInvaders_Model\SpaceInvaders_Agent_play/SpaceInvaders-agent-play-episode-0.mp4




In [16]:
#Close the environment
SpaceInvaders_agent.close_env()

# Pacman

In [17]:
#initialize the agent and create the environment
Pacman_agent_agent = DQNAgent(name="Pacman", env_name="MsPacman-v4")

In [18]:
#Play the pacman game randomly for 20 episodes
Pacman_agent_agent.play_episodes(num_episodes=20)

Playing the Pacman game randomly for 20 episodes
Episode 1: 210.0
Episode 2: 200.0
Episode 3: 240.0
Episode 4: 400.0
Episode 5: 160.0
Episode 6: 150.0
Episode 7: 190.0
Episode 8: 190.0
Episode 9: 140.0
Episode 10: 220.0
Episode 11: 220.0
Episode 12: 140.0
Episode 13: 210.0
Episode 14: 250.0
Episode 15: 230.0
Episode 16: 160.0
Episode 17: 150.0
Episode 18: 230.0
Episode 19: 270.0
Episode 20: 190.0


In [21]:
#train the agent
Pacman_agent_agent.train(time_steps=1000000, stop_value=1000)

Eval num_timesteps=20000, episode_reward=486.00 +/- 79.90
Episode length: 861.60 +/- 102.27
New best mean reward!
Eval num_timesteps=40000, episode_reward=502.00 +/- 62.42
Episode length: 695.20 +/- 74.50
New best mean reward!
Eval num_timesteps=60000, episode_reward=542.00 +/- 94.32
Episode length: 629.20 +/- 92.42
New best mean reward!
Eval num_timesteps=80000, episode_reward=518.00 +/- 151.97
Episode length: 750.20 +/- 185.25
Eval num_timesteps=100000, episode_reward=646.00 +/- 220.87
Episode length: 831.20 +/- 86.72
New best mean reward!
Eval num_timesteps=120000, episode_reward=388.00 +/- 76.00
Episode length: 638.00 +/- 125.80
Eval num_timesteps=140000, episode_reward=376.00 +/- 109.65
Episode length: 794.40 +/- 97.70
Eval num_timesteps=160000, episode_reward=482.00 +/- 99.28
Episode length: 730.40 +/- 133.74
Eval num_timesteps=180000, episode_reward=510.00 +/- 374.01
Episode length: 829.00 +/- 204.96
Eval num_timesteps=200000, episode_reward=862.00 +/- 273.31
Episode length: 922

In [22]:
#evaluate the policy used by the agent
Pacman_agent_agent.evaluate_policy(episodes=10)

Mean reward over 10 episodes is 728.0 with a standard deviation of 382.1465687403198


In [23]:
Pacman_agent_agent.save_model()

In [24]:
# test out the agent with the pacman game
Pacman_agent_agent.play_episodes(num_episodes=10, play_type="predict")

Episode 1: 410.0
Episode 2: 480.0
Episode 3: 820.0
Episode 4: 660.0
Episode 5: 800.0
Episode 6: 410.0
Episode 7: 430.0
Episode 8: 1050.0
Episode 9: 530.0
Episode 10: 620.0
Moviepy - Building video c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_Pacman_Model\Pacman_Agent_play/Pacman-agent-play-episode-0.mp4.
Moviepy - Writing video c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_Pacman_Model\Pacman_Agent_play/Pacman-agent-play-episode-0.mp4



                                                                  

Moviepy - Done !
Moviepy - video ready c:\Users\Steel\Downloads\my_atari_games\Saved_Models\DQN_Pacman_Model\Pacman_Agent_play/Pacman-agent-play-episode-0.mp4




In [25]:
#Close the environment
Pacman_agent_agent.close_env()