In [1]:
import gymnasium as gym
from stable_baselines3 import SAC
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv, VecMonitor, VecNormalize
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvStepReturn, VecEnvWrapper
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from IPython.display import clear_output
import os

In [2]:
class VecPendulumRewardWrapper(VecEnvWrapper):
    def __init__(self, venv: VecEnv):
        super().__init__(venv=venv)

    def reset(self) -> np.ndarray:
        obs = self.venv.reset()
        return obs
    
    def step_async(self, actions: np.ndarray) -> None:
        self.venv.step_async(actions)

    def step_wait(self) -> VecEnvStepReturn:
        obs, reward, done, info = self.venv.step_wait()
        reward = 1- obs[:, 0]**2 - obs[:, 1]**2
        return obs, reward, done, info

def make_env(env_id, rank, seed=0):
    def _init():
        env = gym.make(env_id)
        return env
    return _init

env_id = 'InvertedPendulum-v4'  # Replace with your MuJoCo environment
num_envs = 16  # Number of parallel environments

# Create the vectorized environment
env = SubprocVecEnv([make_env(env_id, i) for i in range(num_envs)])
print(env.seed())
# Add the reward wrapper
env = VecPendulumRewardWrapper(env)
env = VecMonitor(env)  # Optional: for monitoring and logging

model = SAC('MlpPolicy', env, learning_rate=0.0003, buffer_size=1e6, learning_starts=100, batch_size=256, tau=0.005, gamma=0.99, verbose=0)
model.learn(total_timesteps=1e5, log_interval=4, progress_bar=True)
model.save("sac_pendulum")

# del model # remove to demonstrate saving and loading



[1857305660, 1857305661, 1857305662, 1857305663, 1857305664, 1857305665, 1857305666, 1857305667, 1857305668, 1857305669, 1857305670, 1857305671, 1857305672, 1857305673, 1857305674, 1857305675]


Output()

In [3]:
model = SAC.load("sac_pendulum")

class PendulumRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env=env)

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        return obs

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        reward = 1 - obs[0]**2 - obs[1]**2
        return obs, reward, terminated, truncated, info

env = gym.make("InvertedPendulum-v4", render_mode='rgb_array')
env = PendulumRewardWrapper(env)

In [7]:
obs, info = env.reset()
data = []
total_reward = 0
terminated = False
truncated = False
while not truncated:
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    total_reward += reward
    data.append(np.concatenate((action, obs, np.array([reward]), np.array([total_reward]))))

names = ["action", "cart_pos", "pole_angle", "cart_velocity", "pole_ang_vel", "reward", "total_reward"]

P = pd.DataFrame(data, columns = names)

name = "testrun"
saveFile = "recordings/" + name
P.to_csv(saveFile + ".csv")

In [5]:
env = gym.wrappers.RecordVideo(env, video_folder="./save_videos1", disable_logger=True)
obs, info = env.reset()
terminated, truncated = False, False
while not (terminated or truncated):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
env.close()

  logger.warn(
