In [None]:
!pip install gymnasium
!pip install stable-baselines3[extra]
!pip install gymnasium[atari]
!pip install tensorflow

Collecting shimmy~=1.3.0 (from shimmy[atari]~=1.3.0; extra == "extra"->stable-baselines3[extra])
  Using cached Shimmy-1.3.0-py3-none-any.whl.metadata (3.7 kB)
Using cached Shimmy-1.3.0-py3-none-any.whl (37 kB)
Installing collected packages: shimmy
  Attempting uninstall: shimmy
    Found existing installation: Shimmy 0.2.1
    Uninstalling Shimmy-0.2.1:
      Successfully uninstalled Shimmy-0.2.1
Successfully installed shimmy-1.3.0
Collecting shimmy<1.0,>=0.1.0 (from shimmy[atari]<1.0,>=0.1.0; extra == "atari"->gymnasium[atari])
  Using cached Shimmy-0.2.1-py3-none-any.whl.metadata (2.3 kB)
Using cached Shimmy-0.2.1-py3-none-any.whl (25 kB)
Installing collected packages: shimmy
  Attempting uninstall: shimmy
    Found existing installation: Shimmy 1.3.0
    Uninstalling Shimmy-1.3.0:
      Successfully uninstalled Shimmy-1.3.0
Successfully installed shimmy-0.2.1


# importing the libraries

In [None]:
import os
import gymnasium as gym
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.dqn.policies import CnnPolicy
from gymnasium.wrappers import FrameStack, ResizeObservation
from gymnasium.utils.save_video import save_video
from PIL import Image
import warnings

warnings.filterwarnings('ignore')


# CartPole Agent

In [None]:
class CartPoleDQNAgent:
    def __init__(self, name=None, env_name=None, eval_freq=20000, buffer_size=1000):
        self.name = name
        self.env_name = env_name
        self.policy = "MlpPolicy"
        self.eval_freq = eval_freq
        self.buffer_size = buffer_size
        self.log_path = os.path.join('Training/DQN_' + self.name + '_Log')
        self.save_path = os.path.join('Saved_Models/DQN_' + self.name + '_Model')
        self.env = self.make_environment()
        self.model = self._build_dqn()

    def make_environment(self):
        env = gym.make(self.env_name, render_mode="rgb_array")
        env = DummyVecEnv([lambda: env])
        return env

    def _build_dqn(self):
        model = DQN(policy=self.policy, env=self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size)
        return model

    def _play_one_episode(self):
        obs = self.env.reset()
        done = False
        score = 0

        while not done:
            action = self.env.action_space.sample()
            obs, reward, done, _ = self.env.step([action])
            score += reward

        return score

    def play_episodes(self, num_episodes=10, play_type="random"):
        if play_type == "random":
            print(f"Playing the {self.name} game randomly for {num_episodes} episodes")
            scores = [self._play_one_episode() for _ in range(num_episodes)]
            for episode, score in enumerate(scores, 1):
                print(f"Episode {episode}: {score}")

        if play_type == "predict":
            episode_rewards = []
            frames = []

            for episode in range(num_episodes):
                obs = self.env.reset()
                done = False
                score = 0

                while not done:
                    action, _ = self.model.predict(obs)
                    obs, reward, done, *info = self.env.step(action)
                    score += reward
                    frame = Image.fromarray(self.env.render())
                    frame = np.array(frame)
                    frames.append(frame)

                episode_rewards.append(score)

                print(f"Episode {episode+1}: {score}")

            video_path = os.path.join(self.save_path, self.name + "_Agent_play")

            save_video(frames, video_path, fps=30, name_prefix=f"{self.name}-agent-play")

    def train(self, time_steps=None, stop_value=None):
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0)
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback, eval_freq=self.eval_freq, best_model_save_path=self.save_path)
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)

    def evaluate_policy(self, episodes=None):
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")

    def close_env(self):
        self.env.close()

In [None]:
#create the agent and create the environment
CartPole_agent = CartPoleDQNAgent(name="CartPole", env_name="CartPole-v1")

In [None]:
# Modify the play_episodes method to play randomly for a given number of episodes
def play_episodes(self, num_episodes=20):
    for episode in range(num_episodes):
        state = self.env.reset()
        done = False
        while not done:
            action = self.env.action_space.sample()  # Take a random action
            state, reward, done, _ = self.env.step(action)
            self.env.render()  # Optional: render the game

# Play the CartPole game randomly for 30 episodes
CartPole_agent.play_episodes(num_episodes=30)

Playing the CartPole game randomly for 30 episodes
Episode 1: [38.]
Episode 2: [24.]
Episode 3: [48.]
Episode 4: [14.]
Episode 5: [15.]
Episode 6: [16.]
Episode 7: [29.]
Episode 8: [26.]
Episode 9: [14.]
Episode 10: [13.]
Episode 11: [17.]
Episode 12: [11.]
Episode 13: [19.]
Episode 14: [28.]
Episode 15: [19.]
Episode 16: [48.]
Episode 17: [33.]
Episode 18: [15.]
Episode 19: [10.]
Episode 20: [13.]
Episode 21: [23.]
Episode 22: [17.]
Episode 23: [20.]
Episode 24: [25.]
Episode 25: [12.]
Episode 26: [29.]
Episode 27: [9.]
Episode 28: [11.]
Episode 29: [16.]
Episode 30: [32.]


In [None]:
# Modify the train method to include custom early stopping criteria
def train(self, time_steps=200000, stop_value=500):
    best_performance = 0
    for step in range(time_steps):
        # Training logic here
        if step % 10000 == 0:  # Log every 10,000 steps
            print(f"Time Step: {step}, Best Performance so far: {best_performance}")

        # Custom early stopping condition
        if self.current_performance > best_performance:
            best_performance = self.current_performance

        if self.current_performance >= stop_value:
            print(f"Stopping early at step {step}, reached stop value with performance of {self.current_performance}")
            break

# Test out the agent with the CartPole game
CartPole_agent.train(time_steps=200000, stop_value=500)

Eval num_timesteps=20000, episode_reward=9.40 +/- 0.49
Episode length: 9.40 +/- 0.49
New best mean reward!
Eval num_timesteps=40000, episode_reward=119.80 +/- 33.84
Episode length: 119.80 +/- 33.84
New best mean reward!
Eval num_timesteps=60000, episode_reward=170.80 +/- 42.05
Episode length: 170.80 +/- 42.05
New best mean reward!
Eval num_timesteps=80000, episode_reward=90.20 +/- 26.59
Episode length: 90.20 +/- 26.59
Eval num_timesteps=100000, episode_reward=226.60 +/- 30.47
Episode length: 226.60 +/- 30.47
New best mean reward!
Eval num_timesteps=120000, episode_reward=149.20 +/- 6.82
Episode length: 149.20 +/- 6.82
Eval num_timesteps=140000, episode_reward=143.80 +/- 8.52
Episode length: 143.80 +/- 8.52
Eval num_timesteps=160000, episode_reward=9.60 +/- 0.49
Episode length: 9.60 +/- 0.49
Eval num_timesteps=180000, episode_reward=9.40 +/- 0.49
Episode length: 9.40 +/- 0.49
Eval num_timesteps=200000, episode_reward=103.40 +/- 26.09
Episode length: 103.40 +/- 26.09


In [None]:
# Modify the play_episodes method to support different play types
def play_episodes(self, num_episodes=10, play_type="predict"):
    for episode in range(num_episodes):
        state = self.env.reset()
        done = False
        total_reward = 0
        while not done:
            if play_type == "predict":
                action = self.predict(state)  # Predict action using the agent's model
            elif play_type == "random":
                action = self.env.action_space.sample()  # Take a random action
            else:
                raise ValueError(f"Unknown play_type: {play_type}")

            state, reward, done, _ = self.env.step(action)
            total_reward += reward

        print(f"Episode {episode + 1}: Total Reward = {total_reward}")

# Test out the agent with the CartPole game for 10 episodes using the "predict" play type
CartPole_agent.play_episodes(num_episodes=10, play_type="predict")

Episode 1: [63.]
Episode 2: [136.]
Episode 3: [78.]
Episode 4: [82.]
Episode 5: [144.]
Episode 6: [104.]
Episode 7: [133.]
Episode 8: [98.]
Episode 9: [78.]
Episode 10: [67.]
Moviepy - Building video /content/Saved_Models/DQN_CartPole_Model/CartPole_Agent_play/CartPole-agent-play-episode-0.mp4.
Moviepy - Writing video /content/Saved_Models/DQN_CartPole_Model/CartPole_Agent_play/CartPole-agent-play-episode-0.mp4



                                                               

Moviepy - Done !
Moviepy - video ready /content/Saved_Models/DQN_CartPole_Model/CartPole_Agent_play/CartPole-agent-play-episode-0.mp4




In [None]:
#close the environment
CartPole_agent.close_env()

# DQNAgent for SpaceInvaders and Pac-Man

In [None]:
class DQNAgent:
    def __init__(self, name=None, env_name=None, eval_freq=20000, buffer_size=1000):
        self.name = name
        self.env_name = env_name
        self.eval_freq = eval_freq
        self.buffer_size = buffer_size
        self.log_path = os.path.join('Training/DQN_' + self.name + '_Log')
        self.save_path = os.path.join('Saved_Models/DQN_' + self.name + '_Model')
        self.env = self.make_environment()
        self.model = self._build_dqn()

    def make_environment(self):
        env = gym.make(self.env_name, render_mode="rgb_array")
        env = ResizeObservation(env, 84)
        return env

    def _build_dqn(self):
        model = DQN(CnnPolicy, self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size)
        return model

    def _play_one_episode(self):
        obs, _ = self.env.reset()
        done = False
        score = 0

        while not done:
            action = self.env.action_space.sample()
            obs, reward, done, *info = self.env.step(action)
            score += reward

        return score

    def play_episodes(self, num_episodes=10, play_type="random"):
        if play_type == "random":
            print(f"Playing the {self.name} game randomly for {num_episodes} episodes")
            scores = [self._play_one_episode() for _ in range(num_episodes)]
            for episode, score in enumerate(scores, 1):
                print(f"Episode {episode}: {score}")

        if play_type == "predict":
            episode_rewards = []
            frames = []

            for episode in range(num_episodes):
                obs, _ = self.env.reset()
                done = False
                score = 0

                while not done:
                    action, _ = self.model.predict(obs)
                    obs, reward, done, *info = self.env.step(action)
                    score += reward
                    frame = Image.fromarray(self.env.render())
                    frame = np.array(frame)
                    frames.append(frame)

                episode_rewards.append(score)

                print(f"Episode {episode+1}: {score}")

            video_path = os.path.join(self.save_path, self.name + "_Agent_play")

            save_video(frames, video_path, fps=30, name_prefix=f"{self.name}-agent-play")

    def train(self, time_steps=None, stop_value=None):
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0)
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback, eval_freq=self.eval_freq, best_model_save_path=self.save_path)
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)

    def evaluate_policy(self, episodes=None):
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")

    def load_best_model(self):
        best_model = DQN.load(self.save_path + "/best_model")
        return best_model

    def save_model(self):
        return self.model.save(self.save_path)

    def close_env(self):
        self.env.close()

# SpaceInvaders

In [None]:
print("Initializing the SpaceInvaders agent...")
try:
    SpaceInvaders_agent = DQNAgent(name="SpaceInvaders", env_name="SpaceInvaders-v4")
    print("Agent object:", SpaceInvaders_agent)
    print("Attributes:", dir(SpaceInvaders_agent))

    # Check if env_name exists
    if hasattr(SpaceInvaders_agent, 'env_name'):
        print("SpaceInvaders agent initialized with environment:", SpaceInvaders_agent.env_name)
    else:
        print("env_name attribute is not found.")
except Exception as e:
    print(f"Error during initialization: {e}")

Initializing the SpaceInvaders agent...
Agent object: <__main__.DQNAgent object at 0x7b73e22d6410>
Attributes: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_build_dqn', '_play_one_episode', 'buffer_size', 'close_env', 'env', 'env_name', 'eval_freq', 'evaluate_policy', 'load_best_model', 'log_path', 'make_environment', 'model', 'name', 'play_episodes', 'save_model', 'save_path', 'train']
SpaceInvaders agent initialized with environment: SpaceInvaders-v4


In [None]:
# test out the agent with the space invaders game
SpaceInvaders_agent.play_episodes(num_episodes=30)

Playing the SpaceInvaders game randomly for 30 episodes
Episode 1: 110.0
Episode 2: 135.0
Episode 3: 210.0
Episode 4: 5.0
Episode 5: 285.0
Episode 6: 75.0
Episode 7: 210.0
Episode 8: 315.0
Episode 9: 235.0
Episode 10: 515.0
Episode 11: 255.0
Episode 12: 20.0
Episode 13: 385.0
Episode 14: 100.0
Episode 15: 145.0
Episode 16: 130.0
Episode 17: 125.0
Episode 18: 120.0
Episode 19: 45.0
Episode 20: 15.0
Episode 21: 180.0
Episode 22: 330.0
Episode 23: 80.0
Episode 24: 120.0
Episode 25: 35.0
Episode 26: 65.0
Episode 27: 415.0
Episode 28: 105.0
Episode 29: 210.0
Episode 30: 80.0


In [None]:
import gym
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv

# Import correct Atari environment depending on whether gym or gymnasium is used
import gymnasium as gym  # if using gymnasium (remove this if using gym)

# Wrap the environment in a VecEnv
env = DummyVecEnv([lambda: gym.make("ALE/SpaceInvaders-v5")])  # For Atari environments in Gymnasium

# Initialize the DQN agent
model = DQN("CnnPolicy", env, verbose=1)

# Train the model
model.learn(total_timesteps=10000)

# Save the trained model
model.save("dqn_space_invaders")

# To close the environment
env.close()

Using cuda device
Wrapping the env in a VecTransposeImage.
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 203      |
|    time_elapsed     | 12       |
|    total_timesteps  | 2571     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.000683 |
|    n_updates        | 617      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 197      |
|    time_elapsed     | 24       |
|    total_timesteps  | 4862     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 0.452    |
|    n_updates        | 1190     |
----------------------------------
-------------------------------

# Pacman

In [None]:
#initialize the agent and create the environment
Pacman_agent_agent = DQNAgent(name="Pacman", env_name="MsPacman-v4")

In [None]:
#Play the pacman game randomly for 30 episodes
Pacman_agent_agent.play_episodes(num_episodes=30)

Playing the Pacman game randomly for 30 episodes
Episode 1: 240.0
Episode 2: 130.0
Episode 3: 250.0
Episode 4: 220.0
Episode 5: 250.0
Episode 6: 230.0
Episode 7: 180.0
Episode 8: 240.0
Episode 9: 190.0
Episode 10: 180.0
Episode 11: 190.0
Episode 12: 250.0
Episode 13: 230.0
Episode 14: 120.0
Episode 15: 120.0
Episode 16: 120.0
Episode 17: 230.0
Episode 18: 210.0
Episode 19: 210.0
Episode 20: 190.0
Episode 21: 280.0
Episode 22: 280.0
Episode 23: 250.0
Episode 24: 260.0
Episode 25: 320.0
Episode 26: 160.0
Episode 27: 210.0
Episode 28: 250.0
Episode 29: 160.0
Episode 30: 210.0


In [None]:
import gym
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy

# Create the Pacman environment (or whatever environment you are using)
env = gym.make("MsPacman-v0")

# Initialize the DQN agent
model = DQN("CnnPolicy", env, verbose=1)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


In [None]:
import gym
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

class DQNAgent:
    def __init__(self, name, env_name):
        # Initialize the environment
        self.env = gym.make(env_name)
        self.state_size = self.env.observation_space.shape[0]  # Define state_size from environment
        self.action_size = self.env.action_space.n  # Number of actions
        self.name = name
        self.model = self.build_model()  # Build the neural network model

    def build_model(self):
        # Build a simple neural network using Keras for the DQN model
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))  # Output layer for Q-values
        model.compile(loss='mse', optimizer='adam')
        return model

    def save_model(self, filepath):
        # Use Keras' save function to save the model
        self.model.save(filepath)
        print(f"Model saved to {filepath}")

# Example usage:
Pacman_agent_agent = DQNAgent(name="Pacman", env_name="MsPacman-v0")  # Adjust the environment name
Pacman_agent_agent.save_model("pacman_dqn_model.h5")



Model saved to pacman_dqn_model.h5


In [None]:
def close_env(self):
    # Close the environment
    self.env.close()
    print(f"Environment for {self.name} has been closed.")

In [27]:
import gym
import numpy as np
from keras.models import Sequential, load_model
from keras.layers import Dense

class DQNAgent:
    def __init__(self, name, env_name):
        # Initialize the environment
        self.env = gym.make(env_name)
        self.state_size = np.prod(self.env.observation_space.shape)  # Flatten the state space
        self.action_size = self.env.action_space.n  # Action space size
        self.name = name
        self.model = self.build_model()  # Build the DQN model

    def build_model(self):
        # Build a simple neural network for the DQN model
        model = Sequential()
        model.add(Dense(24, input_dim=self.state_size, activation='relu'))
        model.add(Dense(24, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))  # Q-values for each action
        model.compile(loss='mse', optimizer='adam')
        return model

    def save_model(self, filepath):
        # Save the model to a file
        self.model.save(filepath)
        print(f"Model saved to {filepath}")

    def close_env(self):
        # Close the environment
        self.env.close()
        print(f"Environment for {self.name} has been closed.")


# Space Invaders
spaceinvaders_agent = DQNAgent(name="SpaceInvaders", env_name="SpaceInvaders-v0")
spaceinvaders_agent.save_model("SpaceInvaders_final_model.h5")  # Save final model
spaceinvaders_agent.close_env()

# CartPole
cartpole_agent = DQNAgent(name="CartPole", env_name="CartPole-v0")
cartpole_agent.save_model("CartPole_final_model.h5")  # Save final model
cartpole_agent.close_env()



Model saved to SpaceInvaders_final_model.h5
Environment for SpaceInvaders has been closed.
Model saved to CartPole_final_model.h5
Environment for CartPole has been closed.
