In [1]:
import carla, json, math, time

# 连接服务器
client = carla.Client("localhost", 2000)
client.set_timeout(10.0)

# 载图（你也可以用 Town04_Opt）
world = client.load_world("Town04_Opt")
bp_lib = world.get_blueprint_library()

# 同步模式 + 固定步长
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.01
world.apply_settings(settings)

print("[WORLD] Town04 ready, dt=0.01s")


[WORLD] Town04 ready, dt=0.01s


In [2]:
# —— 车辆类别到蓝图的简单映射（可自行扩充）——
CATEGORY_TO_BP = {
    "Car": "vehicle.tesla.model3",
    "SUV": "vehicle.nissan.patrol",
    "Truck": "vehicle.carlamotors.carlacola",
    "Bus": "vehicle.volkswagen.t2",
    "Van": "vehicle.mercedes.sprinter",
    "Motorcycle": "vehicle.yamaha.yzf",
    "Bicycle": "vehicle.gazelle.omafiets",
}

def resolve_blueprint_id(bp_lib, value: str, default_id="vehicle.tesla.model3") -> str:
    if not value: return default_id
    v = value.strip()
    if v.startswith("vehicle.") and bp_lib.find(v):
        return v
    key = v.capitalize()
    bp = CATEGORY_TO_BP.get(key, default_id)
    return bp if bp_lib.find(bp) else default_id

def snap_to_lane(world, loc: carla.Location) -> carla.Transform:
    """投影到最近 Driving 车道中心：贴地 + 归一化 yaw。"""
    wp = world.get_map().get_waypoint(loc, project_to_road=True, lane_type=carla.LaneType.Driving)
    if wp is None:
        raise RuntimeError("附近无可驾驶车道")
    tf = wp.transform
    tf.location.z = wp.transform.location.z + 0.3
    tf.rotation.yaw = (tf.rotation.yaw + 180.0) % 360.0 - 180.0
    return tf

def set_speed_along_yaw(actor, speed_mps: float):
    yaw = math.radians(actor.get_transform().rotation.yaw)
    vel = carla.Vector3D(math.cos(yaw)*speed_mps, math.sin(yaw)*speed_mps, 0.0)
    actor.set_target_velocity(vel)

def guess_half_len(bp_id: str, default=2.4):
    """估算半长，用于将净距→车心距（可按需扩充更精确）"""
    table = {
        "vehicle.tesla.model3": 2.35,
        "vehicle.nissan.patrol": 2.70,
        "vehicle.carlamotors.carlacola": 3.50,
        "vehicle.volkswagen.t2": 2.40,
        "vehicle.mercedes.sprinter": 2.70,
        "vehicle.yamaha.yzf": 1.10,
        "vehicle.gazelle.omafiets": 0.90,
    }
    return table.get(bp_id, default)

def advance_along_road(wp: carla.Waypoint, distance_m: float, step=1.0) -> carla.Waypoint:
    """沿道路方向推进 distance_m（正=前，负=后），返回新的 waypoint。"""
    if abs(distance_m) < 1e-3:
        return wp
    remain = abs(distance_m)
    cur = wp
    if distance_m >= 0:
        while remain > 1e-6:
            seg = min(step, remain)
            nxt = cur.next(seg)
            if not nxt: break
            cur = nxt[0]
            remain -= seg
    else:
        while remain > 1e-6:
            seg = min(step, remain)
            prv = cur.previous(seg)
            if not prv: break
            cur = prv[0]
            remain -= seg
    return cur

def choose_lane_by_lateral_offset_fixed(sv_wp: carla.Waypoint, dy_m: float, lane_width: float = 3.5):
    """
    用固定 3.5m 判定横向跨道：
      - |dy| < 0.5*W => 不跨
      - 否则 N = round(|dy|/W)，dy>0 向左跨 N 道；dy<0 向右跨 N 道
    返回：(目标 lane waypoint, 实际跨道数量)
    """
    W = float(lane_width)
    if abs(dy_m) < 0.5 * W:
        return sv_wp, 0
    N = int(round(abs(dy_m) / W))
    cur = sv_wp
    crossed = 0
    for _ in range(N):
        nxt = cur.get_left_lane() if dy_m > 0 else cur.get_right_lane()
        if not nxt or nxt.lane_type != carla.LaneType.Driving:
            break
        cur = nxt
        crossed += 1
    return cur, crossed

def spawn_on_lane_with_scan(world, bp, seed_tf, ahead=15.0, back=8.0, step=1.5, min_clear=2.0):
    """在 seed_tf 所在车道生成；被占则沿该车道前/后扫描最近空位。"""
    m = world.get_map()
    wp = m.get_waypoint(seed_tf.location, project_to_road=True, lane_type=carla.LaneType.Driving)
    if wp is None:
        return None

    def is_clear(tf):
        for v in world.get_actors().filter("vehicle.*"):
            try:
                if v.get_transform().location.distance(tf.location) < min_clear:
                    return False
            except:
                pass
        return True

    base_tf = wp.transform
    base_tf.location.z = max(base_tf.location.z, seed_tf.location.z, 0.3)
    base_tf.rotation = seed_tf.rotation

    if is_clear(base_tf):
        a = world.try_spawn_actor(bp, base_tf)
        if a: return a

    cur, dist = wp, 0.0
    while dist <= ahead:
        nxt = cur.next(step)
        if not nxt: break
        cur = nxt[0]
        tf = cur.transform
        tf.location.z = max(tf.location.z, 0.3)
        tf.rotation = base_tf.rotation
        if is_clear(tf):
            a = world.try_spawn_actor(bp, tf)
            if a: return a
        dist += step

    cur, dist = wp, 0.0
    while dist <= back:
        prv = cur.previous(step)
        if not prv: break
        cur = prv[0]
        tf = cur.transform
        tf.location.z = max(tf.location.z, 0.3)
        tf.rotation = base_tf.rotation
        if is_clear(tf):
            a = world.try_spawn_actor(bp, tf)
            if a: return a
        dist += step

    return None


In [3]:
# 读取你的 JSON（替换成你的路径；注意 r"..." 原始字符串）
json_path = r"C:\Users\mingw\oml-car-variants\build\results\template\scene05.json"
with open(json_path, "r", encoding="utf-8") as f:
    scene = json.load(f)

# 天气（可选）
try:
    weather_name = scene["results"]["bindings"][0]["weatherPattern"]["value"]
    weather = getattr(carla.WeatherParameters, weather_name, carla.WeatherParameters.ClearNoon)
    world.set_weather(weather)
    print(f"[WEATHER] {weather_name}")
except Exception:
    pass

# 取 SV 的位置/速度/类型（SPARQL 风格）
sv_x = sv_y = sv_z = sv_speed = None
sv_type = "Car"
for b in scene.get("results", {}).get("bindings", []):
    if "SVposx" in b and "SVposy" in b and "SVposz" in b and "SVvel" in b:
        sv_x = float(b["SVposx"]["value"])
        sv_y = float(b["SVposy"]["value"])
        sv_z = float(b["SVposz"]["value"])
        sv_speed = float(b["SVvel"]["value"])
        if "SVVehicleType" in b:
            sv_type = b["SVVehicleType"]["value"]
        break
if sv_x is None:
    raise RuntimeError("JSON 里未找到 SV 的 pos/vel/type")

# 用 (x,y,z) 贴路生成，自车 yaw 由道路方向决定
rough_loc = carla.Location(x=sv_x, y=sv_y, z=sv_z)
tf_sv = snap_to_lane(world, rough_loc)

bp_sv_id = resolve_blueprint_id(bp_lib, sv_type, default_id="vehicle.tesla.model3")
bp_sv = bp_lib.find(bp_sv_id)

sv = spawn_on_lane_with_scan(world, bp_sv, tf_sv, ahead=20.0, back=10.0, step=1.5, min_clear=6.0)
if sv is None:
    raise RuntimeError("SV 生成失败：该点及前后都被占或不可放置。")

# 推进一帧再读姿态
try: world.tick()
except: world.wait_for_tick()

sv_tf = sv.get_transform()
print(f"[SV] id={sv.id} type={bp_sv_id} "
      f"@({sv_tf.location.x:.2f},{sv_tf.location.y:.2f},{sv_tf.location.z:.2f}) "
      f"yaw={sv_tf.rotation.yaw:.1f}°")

# 设定匀速
if sv_speed is not None:
    set_speed_along_yaw(sv, sv_speed)

# 把相机移到自车上方
spectator = world.get_spectator()
spectator.set_transform(
    carla.Transform(
        sv_tf.location + carla.Location(z=10.0),
        carla.Rotation(pitch=-30.0, yaw=sv_tf.rotation.yaw)
    )
)


[WEATHER] HardRainNoon
[SV] id=147 type=vehicle.tesla.model3 @(87.31,13.44,11.14) yaw=-179.8°


In [4]:
# 从 JSON 解析 POV 列表
raw_povs = []
for b in scene.get("results", {}).get("bindings", []):
    keys = ("PovVehicleType", "Povposx", "Povposy", "Povposz", "Povvel")
    if all(k in b for k in keys):
        raw_povs.append((
            b["PovVehicleType"]["value"],
            float(b["Povposx"]["value"]),  # dx_gap (保险杠净距，正=前，负=后)
            float(b["Povposy"]["value"]),  # dy (米)
            float(b["Povposz"]["value"]),  # dz
            float(b["Povvel"]["value"])    # 速度 m/s
        ))

# 去重（有些 SPARQL 会重复）
povs_cfg, seen = [], set()
for typ, dxg, dy, dz, spd in raw_povs:
    key = (typ, round(dxg,4), round(dy,4), round(dz,4), round(spd,4))
    if key in seen: continue
    seen.add(key)
    povs_cfg.append({"type": typ, "dx_gap": dxg, "dy": dy, "dz": dz, "speed": spd})

print(f"[JSON] parsed {len(povs_cfg)} POV(s)")
for i, c in enumerate(povs_cfg, 1):
    print(f"  POV{i}: {c}")

# 生成 POV
spawned_povs = []
sv_wp = world.get_map().get_waypoint(sv.get_transform().location, project_to_road=True, lane_type=carla.LaneType.Driving)
sv_half = float(sv.bounding_box.extent.x)

for i, cfg in enumerate(povs_cfg, 1):
    bp_id = resolve_blueprint_id(bp_lib, cfg["type"], default_id="vehicle.tesla.model3")
    bp = bp_lib.find(bp_id)

    dx_gap = cfg["dx_gap"]
    dy     = cfg["dy"]
    dz     = cfg["dz"]
    speed  = cfg["speed"]

    # 净距 -> 车心距离（先用 POV 半长估计）
    pov_half_est = guess_half_len(bp_id)
    dx_center = (dx_gap + sv_half + pov_half_est) if dx_gap >= 0 else (dx_gap - sv_half - pov_half_est)

    # 沿道路推进纵向位移
    wp_long = advance_along_road(sv_wp, dx_center, step=1.0)

    # 用固定 3.5m 的跨道规则选择目标车道
    wp_lane, crossed = choose_lane_by_lateral_offset_fixed(wp_long, dy, lane_width=3.5)

    # 在目标车道中心生成（贴地 + dz；yaw 用车道方向）
    tf = wp_lane.transform
    tf.location.z = wp_lane.transform.location.z + max(dz, 0.0) + 0.3

    pov = spawn_on_lane_with_scan(world, bp, tf, ahead=15.0, back=8.0, step=1.5, min_clear=2.0)
    if pov is None:
        print(f"[POV{i}] 生成失败 @ road={wp_lane.road_id}, lane={wp_lane.lane_id}")
        continue

    set_speed_along_yaw(pov, speed)
    spawned_povs.append(pov)

    # 推进一帧再读姿态
    try: world.tick()
    except: world.wait_for_tick()

    tf_done = pov.get_transform()
    print(f"[POV{i}] id={pov.id} type={bp_id} "
          f"lane(road={wp_lane.road_id}, lane={wp_lane.lane_id}, crossed={crossed}) "
          f"@({tf_done.location.x:.2f},{tf_done.location.y:.2f},{tf_done.location.z:.2f}) "
          f"yaw={tf_done.rotation.yaw:.1f}°")


[JSON] parsed 3 POV(s)
  POV1: {'type': 'Car', 'dx_gap': -5.0, 'dy': -3.5, 'dz': 0.0, 'speed': 6.6667}
  POV2: {'type': 'Car', 'dx_gap': 5.0, 'dy': -3.5, 'dz': 0.0, 'speed': 6.6667}
  POV3: {'type': 'Car', 'dx_gap': 0.0, 'dy': -3.5, 'dz': 0.0, 'speed': 6.6667}
[POV1] id=148 type=vehicle.tesla.model3 lane(road=1091, lane=-3, crossed=1) @(96.98,9.98,11.02) yaw=-179.5°
[POV2] id=149 type=vehicle.tesla.model3 lane(road=1091, lane=-3, crossed=1) @(77.37,9.90,11.23) yaw=-179.8°
[POV3] id=150 type=vehicle.tesla.model3 lane(road=1091, lane=-3, crossed=1) @(82.37,9.92,11.19) yaw=-179.8°


In [5]:
# === 运行场景 ===
sim_time = 5.0   # 总时间 (秒)
dt = 0.01        # 步长 (秒)
steps = int(sim_time / dt)

print(f"[RUN] 开始运行 {sim_time:.1f}s, 共 {steps} 步, dt={dt}s")

for i in range(steps):
    world.tick()
    if (i+1) % 100 == 0:
        print(f"  已运行 {(i+1)*dt:.2f}s")

print("[RUN] 完成")


[RUN] 开始运行 5.0s, 共 500 步, dt=0.01s
  已运行 1.00s
  已运行 2.00s
  已运行 3.00s
  已运行 4.00s
  已运行 5.00s
[RUN] 完成


In [None]:
actors = world.get_actors().filter("vehicle.*")
print(f"[CLEANUP] 共 {len(actors)} 辆，开始销毁...")
for v in actors:
    try:
        v.destroy()
    except Exception as e:
        print(f"  [WARN] 销毁 {v.id} 失败：{e}")
print("[DONE] 车辆清理完成")
