In [3]:
import stable_baselines3
import gymnasium as gym
import numpy as np

# Import the Proximal Policy Optimization (PPO) algorithm from a reinforcement learning library like Stable Baselines3
from stable_baselines3 import PPO
# Import the MlpPolicy, which is a type of policy used in PPO
from stable_baselines3.ppo import MlpPolicy

from stable_baselines3.common.evaluation import evaluate_policy

###############################################################################

# Create a Gym environment for the CartPole-v1 task and set render_mode to "rgb_array"
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Create a PPO model with an MlpPolicy for the given environment
model = PPO(MlpPolicy, env, verbose=0)

#what does this do?
# model = PPO('MlpPolicy', "CartPole-v1", verbose=1).learn(1000)

# Use a separate environement for evaluation
eval_env = gym.make("CartPole-v1", render_mode="rgb_array")

# Random Agent, before training
# mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100)

# Train the agent for 10000 steps
model.learn(total_timesteps=10_000)

# Evaluate the trained agent
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100)

print(f"mean_reward:{mean_reward:.2f} +/- {std_reward:.2f}")

##################################################
#made up function
# Define a helper function to evaluate the RL agent
def evaluate(model, num_episodes=100, deterministic=True):
    """
    Evaluate a RL agent
    :param model: (BaseRLModel object) the RL Agent
    :param num_episodes: (int) number of episodes to evaluate it
    :return: (float) Mean reward for the last num_episodes
    """
    # This function will only work for a single Environment

    # Get the vectorized environment associated with the model
    vec_env = model.get_env()
    
    # Initialize a list to store episode rewards
    all_episode_rewards = []
    
    # Loop through a specified number of episodes
    for i in range(num_episodes):
        episode_rewards = []
        done = False
        obs = vec_env.reset()
        
        # Continue taking actions until the episode is done
        while not done:
            # Predict an action using the model and the current observation
            action, _states = model.predict(obs, deterministic=deterministic)
            
            # Take a step in the environment based on the predicted action
            # The step function returns the next observation, reward, done flag, and additional info
            obs, reward, done, info = vec_env.step(action)
            
            # Append the reward for this step to the episode_rewards list
            episode_rewards.append(reward)

        # Calculate the sum of rewards for this episode and append it to all_episode_rewards
        all_episode_rewards.append(sum(episode_rewards))

    # Calculate the mean episode reward over all episodes
    mean_episode_reward = np.mean(all_episode_rewards)
    
    # Print the mean reward and the number of episodes
    print("Mean reward:", mean_episode_reward, "Num episodes:", num_episodes)

    return mean_episode_reward
##########################################3


mean_reward:255.15 +/- 129.76


In [2]:
# Set up fake display; otherwise rendering will fail
import os
os.system("Xvfb :1 -screen 0 1024x768x24 &")
os.environ['DISPLAY'] = ':1'

import base64
from pathlib import Path

from IPython import display as ipythondisplay


def show_videos(video_path="", prefix=""):
    """
    Taken from https://github.com/eleurent/highway-env

    :param video_path: (str) Path to the folder containing videos
    :param prefix: (str) Filter the video, showing only the only starting with this prefix
    """
    html = []
    for mp4 in Path(video_path).glob("{}*.mp4".format(prefix)):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append(
            """<video alt="{}" autoplay 
                    loop controls style="height: 400px;">
                    <source src="data:video/mp4;base64,{}" type="video/mp4" />
                </video>""".format(
                mp4, video_b64.decode("ascii")
            )
        )
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))


from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv


def record_video(env_id, model, video_length=500, prefix="", video_folder="videos/"):
    """
    :param env_id: (str)
    :param model: (RL model)
    :param video_length: (int)
    :param prefix: (str)
    :param video_folder: (str)
    """
    eval_env = DummyVecEnv([lambda: gym.make("CartPole-v1", render_mode="rgb_array")])
    # Start the video at step=0 and record 500 steps
    eval_env = VecVideoRecorder(
        eval_env,
        video_folder=video_folder,
        record_video_trigger=lambda step: step == 0,
        video_length=video_length,
        name_prefix=prefix,
    )

    obs = eval_env.reset()
    for _ in range(video_length):
        action, _ = model.predict(obs)
        obs, _, _, _ = eval_env.step(action)

    # Close the video recorder
    eval_env.close()

record_video("CartPole-v1", model, video_length=500, prefix="ppo-cartpole")

show_videos("videos", prefix="ppo")

Saving video to c:\Users\Personal\Desktop\M1 research project\AtariDeepRL\notebooks\intro_week1\videos\ppo-cartpole-step-0-to-step-500.mp4
Moviepy - Building video c:\Users\Personal\Desktop\M1 research project\AtariDeepRL\notebooks\intro_week1\videos\ppo-cartpole-step-0-to-step-500.mp4.
Moviepy - Writing video c:\Users\Personal\Desktop\M1 research project\AtariDeepRL\notebooks\intro_week1\videos\ppo-cartpole-step-0-to-step-500.mp4



                                                               

Moviepy - Done !
Moviepy - video ready c:\Users\Personal\Desktop\M1 research project\AtariDeepRL\notebooks\intro_week1\videos\ppo-cartpole-step-0-to-step-500.mp4
