In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt

import gymnasium as gym

from stable_baselines3 import SAC
from stable_baselines3.common import results_plotter
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.results_plotter import load_results, ts2xy
from stable_baselines3.common.monitor import Monitor


# PPO

## Create Callback

In [2]:
class SaveOnBestTrainingRewardCallback(BaseCallback):
    """
    Callback for saving a model (the check is done every ``check_freq`` steps)
    based on the training reward (in practice, we recommend using ``EvalCallback``).

    :param check_freq: (int)
    :param log_dir: (str) Path to the folder where the model will be saved.
      It must contains the file created by the ``Monitor`` wrapper.
    :param verbose: (int)
    """

    def __init__(self, check_freq: int, log_dir: str, verbose=1):
        super().__init__(verbose)
        self.check_freq = check_freq
        self.log_dir = log_dir
        self.save_path = os.path.join(log_dir, "best_model")
        self.best_mean_reward = -np.inf

    # def _init_callback(self) -> None:
    #     # Create folder if needed
    #     if self.save_path is not None:
    #         os.makedirs(self.save_path, exist_ok=True)

    def _on_step(self) -> bool:
        if self.n_calls % self.check_freq == 0:

            # Retrieve training reward
            x, y = ts2xy(load_results(self.log_dir), "timesteps")
            if len(x) > 0:
                # Mean training reward over the last 100 episodes
                mean_reward = np.mean(y[-100:])
                if self.verbose > 0:
                    print(f"Num timesteps: {self.num_timesteps}")
                    print(
                        f"Best mean reward: {self.best_mean_reward:.2f} - Last mean reward per episode: {mean_reward:.2f}"
                    )

                # New best model, you could save the agent here
                if mean_reward > self.best_mean_reward:
                    self.best_mean_reward = mean_reward
                    # Example for saving best model
                    if self.verbose > 0:
                        print(f"Saving new best model to {self.save_path}.zip")
                    self.model.save(self.save_path)

        return True

## Plot Functions

In [3]:
def moving_average(values, window):
    """
    Smooth values by doing a moving average
    :param values: (numpy array)
    :param window: (int)
    :return: (numpy array)
    """
    weights = np.repeat(1.0, window) / window
    return np.convolve(values, weights, "valid")


def plot_results(log_folder, title="Learning Curve"):
    """
    plot the results

    :param log_folder: (str) the save location of the results to plot
    :param title: (str) the title of the task to plot
    """
    x, y = ts2xy(load_results(log_folder), "timesteps")
    #y = moving_average(y, window=50)
    # Truncate x
    #x = x[len(x) - len(y) :]

    fig = plt.figure(title)
    plt.plot(x, y)
    plt.xlabel("Number of Timesteps")
    plt.ylabel("Rewards")
    plt.title(title + " Smoothed")
    plt.show()

## Create Gif

In [4]:
import imageio
import numpy as np
#from stable_baselines3 import A2C

def create_gif(env, model_path, path, name):
    #model = A2C.load(model_path, env=env)
    #load model
    model = SAC.load(model_path, env=env)
    images = []

    vec_env = model.get_env()
    obs = vec_env.reset()
    img = vec_env.render()
    for i in range(1000):
        images.append(img)
        action, _ = model.predict(obs)
        obs, _, _ ,_ = vec_env.step(action)
        img = vec_env.render(mode='rgb_array')
    gif_name =  path + name + '.gif'
    imageio.mimsave(gif_name, [np.array(img) for i, img in enumerate(images) if i%2 == 0], duration=90)

## Normal parameters

In [5]:
# Create environment
env = gym.make("CarRacing-v2")

In [6]:
#setup callback

# Create log dir
log_dir = "sac_normal/"
os.makedirs(log_dir, exist_ok=True)

# Wrap the environment
env = Monitor(env, log_dir)

callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)

### Train agent

In [7]:
# Instantiate the agent
model = SAC("MlpPolicy", env, verbose=1, device='cuda', buffer_size=10000)
# Train the agent and display a progress bar
timesteps = 10000
model.learn(total_timesteps=int(timesteps), callback=callback)


Using cpu device
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Num timesteps: 1000
Best mean reward: -inf - Last mean reward per episode: -40.81
Saving new best model to sac_normal/best_model.zip
Num timesteps: 2000
Best mean reward: -40.81 - Last mean reward per episode: -38.66
Saving new best model to sac_normal/best_model.zip
Num timesteps: 3000
Best mean reward: -38.66 - Last mean reward per episode: -37.42
Saving new best model to sac_normal/best_model.zip
Num timesteps: 4000
Best mean reward: -37.42 - Last mean reward per episode: -36.10
Saving new best model to sac_normal/best_model.zip
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 1e+03    |
|    ep_rew_mean     | -36.1    |
| time/              |          |
|    episodes        | 4        |
|    fps             | 0        |
|    time_elapsed    | 5115     |
|    total_timesteps | 4000     |
| train/             |          |
|    actor_loss      | -19.8 

<stable_baselines3.sac.sac.SAC at 0x28290463e20>

In [8]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=5)

In [9]:
print(mean_reward, std_reward)

-24.2725584 5.825913801719336


In [10]:
# Helper from the library
#results_plotter.plot_results(
#    [log_dir], timesteps, results_plotter.X_TIMESTEPS, "sac CarRacing-v2"
#)

In [11]:
#plot_results(log_dir)

In [10]:
 #model.save("sac_normal")
# create_gif(env, "sac_normal/best_model.zip", "sac_normal/", "sac_normal")

: 

### Load pre-trained model

In [7]:
test_env = gym.make("CarRacing-v2", render_mode='human')
dir = os.path.join(log_dir, "best_model.zip")
model = SAC.load(dir, env=test_env)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


In [None]:
vec_env = model.get_env()
obs = vec_env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, done, info = vec_env.step(action)
    #obs, rewards, dones, info = test_env.step(action)
    vec_env.render()
    if done:
        break

vec_env.close()

In [26]:
test_env = gym.make("CarRacing-v2", render_mode='rgb_array')
dir = os.path.join(log_dir, "best_model.zip")
create_gif(test_env, dir, log_dir, "sac_car")

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


## Discrete space

In [17]:
# Create environment
env = gym.make("CarRacing-v2", continuous=False)

In [18]:
#setup callback

# Create log dir
log_dir = "sac_discrete/"
os.makedirs(log_dir, exist_ok=True)

# Wrap the environment
env = Monitor(env, log_dir)

callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)

### Train agent

In [20]:
# Instantiate the agent
model = SAC("MlpPolicy", env, verbose=1, device='cuda', buffer_size=10000)
# Train the agent and display a progress bar
timesteps = 10000
model.learn(total_timesteps=int(timesteps), callback=callback)

Using cpu device
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


AssertionError: The algorithm only supports (<class 'gymnasium.spaces.box.Box'>,) as action spaces but Discrete(5) was provided

In [None]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=5)

In [None]:
print(mean_reward, std_reward)

In [None]:
# Helper from the library
#results_plotter.plot_results(
#    [log_dir], timesteps, results_plotter.X_TIMESTEPS, "sac CarRacing-v2"
#)

In [None]:
#plot_results(log_dir)

### Load pre-trained model

In [None]:
test_env = gym.make("CarRacing-v2", render_mode='human', continuous=False)
dir = os.path.join(log_dir, "best_model.zip")
model = SAC.load(dir, env=test_env)

In [None]:
vec_env = model.get_env()
obs = vec_env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, done, info = vec_env.step(action)
    #obs, rewards, dones, info = test_env.step(action)
    vec_env.render()
    if done:
        break

vec_env.close()

In [None]:
test_env = gym.make("CarRacing-v2", render_mode='rgb_array', continuous=False)
dir = os.path.join(log_dir, "best_model")
create_gif(test_env, dir, log_dir, "sac_car")

## Lap completion percentage change 

In [28]:
# Create environment
env = gym.make("CarRacing-v2", lap_complete_percent=0.5)

In [29]:
#setup callback

# Create log dir
log_dir = "sac_lap/"
os.makedirs(log_dir, exist_ok=True)

# Wrap the environment
env = Monitor(env, log_dir)

callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)

### Train agent

In [30]:
# Instantiate the agent
model = SAC("MlpPolicy", env, verbose=1, buffer_size=10000)
# Train the agent and display a progress bar
timesteps = 10000
model.learn(total_timesteps=int(timesteps), callback=callback)

Using cpu device
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.
Num timesteps: 1000
Best mean reward: -inf - Last mean reward per episode: -37.93
Saving new best model to _lap/best_model.zip
Num timesteps: 2000
Best mean reward: -37.93 - Last mean reward per episode: -31.86
Saving new best model to _lap/best_model.zip


In [None]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=5)

In [None]:
print(mean_reward, std_reward)

In [None]:
# Helper from the library
#results_plotter.plot_results(
#    [log_dir], timesteps, results_plotter.X_TIMESTEPS, "sac CarRacing-v2"
#)

In [None]:
#plot_results(log_dir)

### Load pre-trained model

In [None]:
test_env = gym.make("CarRacing-v2", render_mode='human', continuous=False)
dir = os.path.join(log_dir, "best_model.zip")
model = SAC.load(dir, env=test_env)

In [None]:
vec_env = model.get_env()
obs = vec_env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, done, info = vec_env.step(action)
    #obs, rewards, dones, info = test_env.step(action)
    vec_env.render()
    if done:
        break

vec_env.close()

In [None]:
test_env = gym.make("CarRacing-v2", render_mode='rgb_array')
dir = os.path.join(log_dir, "best_model")
create_gif(test_env, dir, log_dir, "sac_car")

## Train with CNN instead of MLP

In [7]:
env = gym.make("CarRacing-v2", render_mode = "rgb_array")

In [8]:
#setup callback

# Create log dir
log_dir = "sac_cnn/"
os.makedirs(log_dir, exist_ok=True)

# Wrap the environment
env = Monitor(env, log_dir)

callback = SaveOnBestTrainingRewardCallback(check_freq=1000, log_dir=log_dir)

In [9]:
# Instantiate the agent
model = SAC("CnnPolicy", env, verbose=1, buffer_size=10000)
# Train the agent and display a progress bar
timesteps = 10000
model.learn(total_timesteps=int(timesteps), callback=callback)

Using cpu device
Wrapping the env in a DummyVecEnv.
Wrapping the env in a VecTransposeImage.


In [None]:
mean_reward, std_reward = evaluate_policy(model, model.get_env(), n_eval_episodes=5)

print(mean_reward, std_reward)

In [None]:
# Helper from the library
#results_plotter.plot_results(
#    [log_dir], timesteps, results_plotter.X_TIMESTEPS, "sac CarRacing-v2"
#)

In [None]:
#plot_results(log_dir)

## Load pre-trained model

In [5]:
test_env = gym.make("CarRacing-v2", render_mode='human')
dir = os.path.join(log_dir, "best_model.zip")
model = SAC.load(dir, env=test_env)

NameError: name 'log_dir' is not defined

In [None]:
vec_env = model.get_env()
obs = vec_env.reset()
while True:
    action, _states = model.predict(obs)
    obs, rewards, done, info = vec_env.step(action)
    #obs, rewards, dones, info = test_env.step(action)
    vec_env.render()
    if done:
        break

vec_env.close()

In [None]:
test_env = gym.make("CarRacing-v2", render_mode='rgb_array')
dir = os.path.join(log_dir, "best_model")
create_gif(test_env, dir, log_dir, "sac_car")