In [1]:
import sys
import subprocess

# Create a virtual environment named 'myenv'
subprocess.run([sys.executable, '-m', 'venv', 'myenv'])

# Activate the virtual environment (for Windows)
activate_script = 'myenv\\Scripts\\activate' if sys.platform == 'win32' else 'source myenv/bin/activate'
subprocess.run(activate_script, shell=True)


CompletedProcess(args='myenv\\Scripts\\activate', returncode=0)

In [2]:
pip install gym

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install --upgrade stable-baselines3





In [4]:
pip install stable-baselines3[extra]

Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install moviepy


Note: you may need to restart the kernel to use updated packages.


In [6]:
import os
import gym
import numpy as np
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnRewardThreshold
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.dqn.policies import CnnPolicy
from gym.wrappers import ResizeObservation
from gym.utils import save_video
from PIL import Image

from stable_baselines3.common.monitor import Monitor

from gym.utils.save_video import save_video
import warnings


# Ignore all warnings
warnings.filterwarnings("ignore")




In [7]:
#CARTPOLE GAME 


In [8]:
class CustomCartPoleDQNAgent:
    def __init__(self, agent_name=None, env_name=None, eval_frequency=20000, buffer_size=1000):
        self.agent_name = agent_name
        self.env_name = env_name
        self.policy = "MlpPolicy"  # policy
        self.eval_frequency = eval_frequency
        self.buffer_size = buffer_size
        self.log_path = os.path.join('C:/Users/Great Woman/Downloads/CartPole_Logs/DQN_' + self.agent_name + '_Log')  # path for logging the training data
        self.save_path = os.path.join('C:/Users/Great Woman/Downloads/CartPole_Models/DQN_' + self.agent_name + '_Model')  # path for saving the trained model
        self.env = self.make_environment()  # function that creates the environment and agent
        self.model = self._build_dqn()  # function that builds the DQN model

    def make_environment(self):  # A call to the function that creates the environment
        env = gym.make(self.env_name, render_mode="rgb_array")  # creates the environment and agent
        env = DummyVecEnv([lambda: env])  # creates a vectorized dummy environment
        return env  # returns the created environment

    def _build_dqn(self):  # A call to the function that builds the DQN model
        model = DQN(self.policy, self.env, verbose=0, tensorboard_log=self.log_path, buffer_size=self.buffer_size)  # creates the DQN model
        return model  # returns the created DQN model

    def _play_one_episode(self):  # A call to the function that plays one episode
        obs = self.env.reset()  # resets the environment
        done = False  # sets the done flag
        score = 0  # sets the score to zero

        while not done:  # loops until the done flag is set
            action = self.env.action_space.sample()  # selects an action from a sample space
            obs, reward, done, _ = self.env.step([action])  # takes the action and returns the observation, reward, done, and info
            score += reward  # Updates the score

        return score  # returns the score value

    def play_episodes(self, num_episodes=10, play_type="random"):  # A call to the function that plays episodes
        if play_type == "random":  # if the play type is random
            print(f"Playing the {self.agent_name} game randomly for {num_episodes} episodes")  # prints the message
            scores = [self._play_one_episode() for _ in range(num_episodes)]  # creates a list of scores
            for episode, score in enumerate(scores, 1):  # loops through the list of scores
                print(f"Episode {episode}: {score}")  # prints the score

        if play_type == "predict":  # if the play type is predict
            episode_rewards = []  # creates a list of episode rewards
            frames = []  # creates a list of frames for the images

            for episode in range(num_episodes):  # loops through the number of episodes
                obs = self.env.reset()  # resets the environment
                done = False  # sets the done flag
                score = 0  # sets the score to zero

                while not done:  # loops until the done flag is set
                    action, _ = self.model.predict(obs)  # predicts the action to take from the observation
                    obs, reward, done, _ = self.env.step(action)  # takes the action and returns the observation, reward, done, and info
                    score += reward  # Updates the score
                    frame = Image.fromarray(self.env.render())  # Captures the frame of image from the environment
                    frame = np.array(frame)  # converts the frame to numpy
                    frames.append(frame)  # adds the frame to the list

                episode_rewards.append(score)  # adds the score to the list

                print(f"Episode {episode+1}: {score}")  # prints the score

            video_path = os.path.join(self.save_path, self.agent_name + "_Agent_play")  # video path

            save_video(frames, video_path, fps=30, name_prefix=f"{self.agent_name}-agent-play")  # saves the video

    def train(self, time_steps=None, stop_value=None):  # A call to the function that trains the agent
        stop_callback = StopTrainingOnRewardThreshold(reward_threshold=stop_value, verbose=0)  # creates the stop callback, assigns the reward threshold so training can stop
        eval_callback = EvalCallback(self.env, callback_on_new_best=stop_callback,
                                     eval_freq=self.eval_frequency, best_model_save_path=self.save_path)  # creates the eval callback, checks if the reward has been achieved
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)  # trains the model

    def evaluate_policy(self, episodes=None):  # A call to the function that evaluates the policy
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)  # evaluates the policy
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")  # prints the mean reward and standard deviation

    def close_env(self):  # A call to the function that closes the environment
        self.env.close()  # closes the environment

# Create the agent and create the environment
CustomCartPole_agent = CustomCartPoleDQNAgent(agent_name="CustomCartPole", env_name="CartPole-v1")

# Play the CartPole game randomly for 20 episodes
CustomCartPole_agent.play_episodes(num_episodes=20)

# Test out the agent with the CartPole game
CustomCartPole_agent.train(time_steps=1000000, stop_value=500)

# Test out the agent with the CartPole game
CustomCartPole_agent.play_episodes(num_episodes=10, play_type="predict")

# Close the environment
CustomCartPole_agent.close_env()


Playing the CustomCartPole game randomly for 20 episodes
Episode 1: [11.]
Episode 2: [16.]
Episode 3: [39.]
Episode 4: [44.]
Episode 5: [11.]
Episode 6: [39.]
Episode 7: [13.]
Episode 8: [39.]
Episode 9: [40.]
Episode 10: [24.]
Episode 11: [17.]
Episode 12: [60.]
Episode 13: [13.]
Episode 14: [32.]
Episode 15: [15.]
Episode 16: [29.]
Episode 17: [12.]
Episode 18: [18.]
Episode 19: [13.]
Episode 20: [21.]
Eval num_timesteps=20000, episode_reward=10.00 +/- 0.00
Episode length: 10.00 +/- 0.00
New best mean reward!
Eval num_timesteps=40000, episode_reward=8.80 +/- 0.40
Episode length: 8.80 +/- 0.40
Eval num_timesteps=60000, episode_reward=9.00 +/- 0.63
Episode length: 9.00 +/- 0.63
Eval num_timesteps=80000, episode_reward=27.40 +/- 11.72
Episode length: 27.40 +/- 11.72
New best mean reward!
Eval num_timesteps=100000, episode_reward=201.20 +/- 18.37
Episode length: 201.20 +/- 18.37
New best mean reward!
Eval num_timesteps=120000, episode_reward=457.60 +/- 52.39
Episode length: 457.60 +/- 52

                                                                                                                       

Moviepy - Done !
Moviepy - video ready C:\Users\Great Woman\Downloads\CartPole_Models\DQN_CustomCartPole_Model\CustomCartPole_Agent_play/CustomCartPole-agent-play-episode-0.mp4


In [9]:
## SPACE INVADERS GAME

In [None]:
import os
import gym
import numpy as np
from PIL import Image
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_util import make_atari_env
import imageio

class DQNAgent:
    def __init__(self, name=None, env_name=None, eval_freq=20000, buffer_size=1000):
        self.name = name
        self.env_name = env_name
        self.eval_freq = eval_freq
        self.buffer_size = buffer_size
        self.log_path = os.path.join('C:/Users/Great Woman/Downloads/my_atari_games/Logs/DQN_' + self.name)
        self.save_path = os.path.join('C:/Users/Great Woman/Downloads/my_atari_games/Saved_Models/DQN_' + self.name)
        self.env = self.make_environment()
        self.model = self._build_dqn()

    def make_environment(self):
        env = make_atari_env(self.env_name, n_envs=1, seed=0, monitor_dir=self.log_path)
        env = VecFrameStack(env, n_stack=4)
        return env

    def _build_dqn(self):
        model = DQN('CnnPolicy', self.env, buffer_size=self.buffer_size, verbose=0, tensorboard_log=self.log_path)
        return model

    def _play_one_episode(self):
        obs = self.env.reset()
        done = False
        score = 0
        frames = []

        while not done:
            action, _ = self.model.predict(obs)
            obs, reward, done, _ = self.env.step(action)
            score += reward
            frame = Image.fromarray(self.env.render(mode='rgb_array'))
            frame = np.array(frame)
            frames.append(frame)

        return score, frames

    def play_episodes(self, num_episodes=10, play_type="random"):
        if play_type == "random":
            print(f"Playing the {self.name} game randomly for {num_episodes} episodes")
            scores = [self._play_one_episode()[0] for _ in range(num_episodes)]
            for episode, score in enumerate(scores, 1):
                print(f"Episode {episode}: {score}")

        if play_type == "predict":
            print(f"Playing {self.name} game and saving video for one episode")
            score, frames = self._play_one_episode()
            print(f"Episode 1: {score}")

            # Save gameplay as an MP4 file
            video_path = os.path.join(self.save_path, f"{self.name}_episode_1.mp4")
            self._save_video(frames, video_path)

            # Add zero episode
            zero_frames = [frames[0]] * 30  # Repeat the first frame 30 times for a duration of 1 second
            zero_video_path = os.path.join(self.save_path, f"{self.name}_episode_0.mp4")
            self._save_video(zero_frames, zero_video_path)

    def _save_video(self, frames, video_path, fps=30):
        with imageio.get_writer(video_path, fps=fps) as video:
            for frame in frames:
                video.append_data(frame)

    def train(self, time_steps=None, stop_value=None):
        eval_callback = EvalCallback(self.env, best_model_save_path=self.save_path, log_path=self.log_path, eval_freq=self.eval_freq)
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)

    def evaluate_policy(self, episodes=None):
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")

    def load_best_model(self):
        best_model = DQN.load(os.path.join(self.save_path, "best_model"))
        return best_model

    def save_model(self):
        return self.model.save(os.path.join(self.save_path, "final_model"))

    def close_env(self):
        self.env.close()

# SpaceInvaders
SpaceInvaders_agent = DQNAgent(name="SpaceInvaders", env_name="SpaceInvadersNoFrameskip-v4")

SpaceInvaders_agent.play_episodes(num_episodes=1, play_type="predict")  # Play and save one episode

SpaceInvaders_agent.train(time_steps=1000000, stop_value=1000)

SpaceInvaders_agent.evaluate_policy(episodes=10)

SpaceInvaders_agent.play_episodes(num_episodes=10, play_type="predict")  # Play and save 10 episodes

SpaceInvaders_agent.close_env()


Playing SpaceInvaders game and saving video for one episode




Episode 1: [8.]




Eval num_timesteps=20000, episode_reward=359.00 +/- 113.68
Episode length: 2898.60 +/- 991.34
New best mean reward!
Eval num_timesteps=40000, episode_reward=310.00 +/- 151.39
Episode length: 2342.60 +/- 547.30
Eval num_timesteps=60000, episode_reward=229.00 +/- 87.77
Episode length: 3412.60 +/- 742.63
Eval num_timesteps=80000, episode_reward=313.00 +/- 107.36
Episode length: 3751.80 +/- 1205.31
Eval num_timesteps=100000, episode_reward=240.00 +/- 160.84
Episode length: 2629.40 +/- 787.56
Eval num_timesteps=120000, episode_reward=198.00 +/- 77.76
Episode length: 2349.40 +/- 334.97
Eval num_timesteps=140000, episode_reward=24.00 +/- 17.15
Episode length: 2269.00 +/- 374.83
Eval num_timesteps=160000, episode_reward=137.00 +/- 116.43
Episode length: 2154.20 +/- 842.01


In [None]:
## PAC MAN GAME

In [None]:
import os
import numpy as np
import gym
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.callbacks import EvalCallback
from PIL import Image
import imageio
from gym.wrappers import ResizeObservation


class DQNAgent:
    def __init__(self, name=None, env_name=None, eval_freq=20000, buffer_size=1000):
        self.name = name
        self.env_name = env_name
        self.eval_freq = eval_freq
        self.buffer_size = buffer_size
        self.log_path = os.path.join('C:/Users/Great Woman/Downloads/my_atari_games/Logs/DQN_' + self.name)
        self.save_path = os.path.join('C:/Users/Great Woman/Downloads/my_atari_games/Saved_Models/DQN_' + self.name)
        self.env = self.make_environment()
        self.model = self._build_dqn()

    def make_environment(self):
        env = gym.make(self.env_name, render_mode="rgb_array")
        env = ResizeObservation(env, 84)
        return DummyVecEnv([lambda: env])

    def _build_dqn(self):
        model = DQN('CnnPolicy', self.env, buffer_size=self.buffer_size, verbose=0, tensorboard_log=self.log_path)
        return model

    def _play_one_episode(self, save_frames=False):
        obs = self.env.reset()
        done = False
        score = 0
        frames = []

        while not done:
            action, _ = self.model.predict(obs)
            obs, reward, done, _ = self.env.step(action)
            score += reward
            if save_frames:
                frame = Image.fromarray(self.env.render())
                frame = np.array(frame)
                frames.append(frame)

        return score, frames

    def play_episodes(self, num_episodes=10, play_type="random"):
        if play_type == "random":
            print(f"Playing the {self.name} game randomly for {num_episodes} episodes")
            scores = [self._play_one_episode()[0] for _ in range(num_episodes)]
            for episode, score in enumerate(scores, 1):
                print(f"Episode {episode}: {score}")

        if play_type == "predict":
            print(f"Playing {self.name} game and saving video for one episode")
            score, frames = self._play_one_episode(save_frames=True)
            print(f"Episode 1: {score}")

            # Save gameplay as an MP4 file
            video_path = os.path.join(self.save_path, f"{self.name}_episode_1.mp4")
            self._save_video(frames, video_path)

    def _save_video(self, frames, video_path, fps=30):
        with imageio.get_writer(video_path, fps=fps) as video:
            for frame in frames:
                video.append_data(frame)

    def train(self, time_steps=None, stop_value=None):
        eval_callback = EvalCallback(self.env, best_model_save_path=self.save_path, log_path=self.log_path, eval_freq=self.eval_freq)
        self.model.learn(total_timesteps=time_steps, callback=eval_callback)

    def evaluate_policy(self, episodes=None):
        mean_reward, reward_std = evaluate_policy(self.model, self.env, n_eval_episodes=episodes)
        print(f"Mean reward over {episodes} episodes is {mean_reward} with a standard deviation of {reward_std}")

    def load_best_model(self):
        best_model = DQN.load(self.save_path + "/best_model")
        return best_model

    def save_model(self):
        return self.model.save(self.save_path)

    def close_env(self):
        self.env.close()

# Usage
Pacman_agent = DQNAgent(name="Pacman", env_name="MsPacmanNoFrameskip-v4")

# Play one episode and save the video
Pacman_agent.play_episodes(num_episodes=1, play_type="predict")

# Train the model
Pacman_agent.train(time_steps=1000000, stop_value=1000)

# Save the trained model
Pacman_agent.save_model()

# Play multiple episodes
Pacman_agent.play_episodes(num_episodes=10, play_type="random")

Pacman_agent.close_env()

