In [1]:
import gym
import numpy as np
import random

from gym.spaces import Box

from metadrive.envs.metadrive_env import MetaDriveEnv
from clip.clip_rewarded_sac import CLIPRewardedSAC
from config import CONFIGS

from stable_baselines3 import PPO

CONFIG = CONFIGS["1"]

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
train_env = MetaDriveEnv()

# model = CLIPRewardedSAC(env=env, config=CONFIG)
# Start with ppo to make sure the observation space is correct
model = PPO("MlpPolicy", train_env, n_steps=4096, verbose=1)
model.learn(total_timesteps=1000, log_interval=4)
train_env.close()

In [None]:
env = MetaDriveEnv(config={"use_render": True})
total_reward = 0
obs, _ = env.reset()

agent = env.agent

try:
    for i in range(1000):
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, done, _, info = env.step(action)
        total_reward += reward
        if done:
            print("episode_reward", total_reward)
            break
finally:
    env.close()

In [2]:
from metadrive.component.road_network import Road

def get_waypoints(agent, delta_s=10):
    roads = []
    navigation = agent.navigation
    checkpoints = navigation.checkpoints

    for i in range(len(checkpoints)-1):
        road = Road(checkpoints[i], checkpoints[i+1])
        roads.append(road)

    waypoints = []
    # How far we are past the last sample (starts at 0, so first sample is at delta_s)
    dist_since_last = 0.0

    for idx, road in enumerate(roads):
        # pick the first lane
        lanes = road.get_lanes(navigation.map.road_network)
        lane = lanes[0]

        # lateral offset at lane center‐line
        middle = (len(lanes) / 2 - 0.5) * lane.width

        # longitudinal start on this lane
        if idx == 0:
            s_start, _ = lane.local_coordinates(agent.position)
        else:
            s_start = 0.0

        lane_len = lane.length
        available = lane_len - s_start

        # How far into this lane to place the *next* sample?
        dist_to_next = (delta_s - dist_since_last) if dist_since_last > 0 else delta_s

        # If there isn't enough room for that first sample, just carry on
        if available < dist_to_next:
            dist_since_last += available
            continue

        # 1) Place the first sample on this lane
        s = s_start + dist_to_next
        x, y = lane.position(s, middle)
        waypoints.append((x, y, 0.5))

        # 2) Place any further full steps of size delta_s
        rem = lane_len - s
        n_more = int(np.floor(rem / delta_s))
        for k in range(1, n_more + 1):
            s_i = s + k * delta_s
            x_i, y_i = lane.position(s_i, middle)
            waypoints.append((x_i, y_i, 0.5))

        # 3) Update how much of a “partial” step is left over
        dist_since_last = rem - n_more * delta_s

    return waypoints

from panda3d.core import LVector3f

env = MetaDriveEnv(config=dict(
    num_scenarios=1,
    start_seed=random.randint(0, 10000),
    random_lane_width=True,
    random_agent_model=True,
    random_lane_num=True,
    use_render=True,

))

obs, info = env.reset()
agent = env.agent

waypoints = get_waypoints(agent)

point_drawer = env.engine.make_point_drawer(scale=5)
point_drawer.reset()
point_drawer.draw_points(waypoints, [[1, 0, 0, 1] for _ in waypoints])

try:
    for i in range(1000):
        obs, reward, terminated, truncated, info = env.step(env.action_space.sample())
finally:
    env.close()

[38;20m[INFO] Environment: MetaDriveEnv[0m
[38;20m[INFO] MetaDrive version: 0.4.3[0m
[38;20m[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector(), main_camera: MainCamera(1200, 900), dashboard: DashBoard()][0m
[38;20m[INFO] Render Mode: onscreen[0m
[38;20m[INFO] Horizon (Max steps per agent): 1000[0m
[38;20m[INFO] Assets version: 0.4.3[0m
[38;20m[INFO] Known Pipes: CocoaGraphicsPipe[0m
[38;20m[INFO] Start Scenario Index: 8753, Num Scenarios : 1[0m
