In [None]:
import gymnasium as gym
import commonroad_rl.gym_commonroad

# kwargs overwrites configs defined in commonroad_rl/gym_commonroad/configs.yaml
env = gym.make("commonroad-v1",
		action_configs={"action_type": "continuous"},
               goal_configs={"observe_distance_goal_long": True, "observe_distance_goal_lat": True},
               surrounding_configs={"observe_lane_circ_surrounding": False,
               		     "fast_distance_calculation": False,
                                    "observe_lidar_circle_surrounding": True,
                                    "lidar_circle_num_beams": 20},
               reward_type="sparse_reward",
               reward_configs={"sparse_reward":{"reward_goal_reached": 50.,
                                      "reward_collision": -100.,
                                      "reward_off_road": -50.,
      					"reward_time_out": -10.,
					"reward_friction_violation": 0.}})

observation = env.reset()
for i in range(50):
    # env.render() # rendered images with be saved under ./img
    action = env.action_space.sample() # your agent here (this takes random actions)
    observation, reward, terminated, done, info = env.step(action)
    # print("LEN: ", len(observation))
    # print(i, observation, reward, terminated, done, info)
    print(info["detected_obstacles"])
    if terminated:
        print("Terminated at i: ", i)
        observation = env.reset()
env.close()

In [None]:
import gymnasium as gym
import commonroad_rl.gym_commonroad
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import EvalCallback, CallbackList, BaseCallback
from tqdm import trange

import warnings
warnings.filterwarnings("ignore")

# kwargs overwrites configs defined in commonroad_rl/gym_commonroad/configs.yaml
env = gym.make("commonroad-v1",
		action_configs={"action_type": "continuous"},
               goal_configs={"observe_distance_goal_long": True, "observe_distance_goal_lat": True},
               surrounding_configs={"observe_lane_circ_surrounding": False,
               		     "fast_distance_calculation": False,
                                    "observe_lidar_circle_surrounding": True,
                                    "lidar_circle_num_beams": 20},
               reward_type="sparse_reward",
               reward_configs={"sparse_reward":{"reward_goal_reached": 50.,
                                      "reward_collision": -100.,
                                      "reward_off_road": -50.,
      					"reward_time_out": -10.,
					"reward_friction_violation": 0.}})

observation = env.reset()

model = PPO("MlpPolicy", env, verbose=1, tensorboard_log="./ppo_commonroad_tensorboard/")

# 添加一个评估回调
eval_env = gym.make("commonroad-v1",
		            action_configs={"action_type": "continuous"},
                    goal_configs={"observe_distance_goal_long": True, "observe_distance_goal_lat": True},
                    surrounding_configs={"observe_lane_circ_surrounding": False,
               		                "fast_distance_calculation": False,
                                    "observe_lidar_circle_surrounding": True,
                                    "lidar_circle_num_beams": 20},
                    reward_type="sparse_reward",
                    reward_configs={"sparse_reward":{"reward_goal_reached": 50.,
                                    "reward_collision": -100.,
                                    "reward_off_road": -50.,
      					            "reward_time_out": -10.,
					                "reward_friction_violation": 0.}})


class TqdmEvalCallback(BaseCallback):
    def __init__(self, eval_env, eval_freq=1000, n_eval_episodes=5, verbose=1):
        super().__init__(verbose)
        self.eval_env = eval_env
        self.eval_freq = eval_freq
        self.n_eval_episodes = n_eval_episodes

    def _on_step(self) -> bool:
        if self.n_calls % self.eval_freq == 0:
            rewards = []
            lengths = []
            for ep in trange(self.n_eval_episodes, desc=f"[Eval] @ Step {self.num_timesteps}", leave=False):
                obs, _ = self.eval_env.reset()
                done = False
                total_reward = 0.0
                ep_len = 0
                while not done:
                    action, _ = self.model.predict(obs, deterministic=True)
                    obs, reward, terminated, truncated, _ = self.eval_env.step(action)
                    done = terminated or truncated
                    total_reward += reward
                    ep_len += 1
                rewards.append(total_reward)
                lengths.append(ep_len)
            mean_reward = sum(rewards) / len(rewards)
            mean_length = sum(lengths) / len(lengths)
            print(f"\n Evaluation at step {self.num_timesteps}:")
            print(f"   ➤ Mean reward: {mean_reward:.2f} | Mean length: {mean_length:.2f}")
        return True

eval_callback = TqdmEvalCallback(eval_env=eval_env, eval_freq=10, n_eval_episodes=5)

callback = CallbackList([eval_callback])

# 训练
model.learn(total_timesteps=100, callback=callback, progress_bar=True)

# 保存模型
model.save("ppo_commonroad_policy")

# 关闭环境
env.close()