diff --git a/critical/base_scenario.py b/critical/base_scenario.py new file mode 100644 index 00000000..4a821ce5 --- /dev/null +++ b/critical/base_scenario.py @@ -0,0 +1,336 @@ +# scenarios/base_scenario.py +# 场景基类:定义场景通用接口,所有差异化逻辑由子类实现 +# 按 scenarios/CLAUDE.md 规范 —— 每个子类的天气/速度/行为/危险触发完全不同 + +import os +import time +import carla + +from config.carla_config import ( + CARLA_HOST, CARLA_PORT, CARLA_TIMEOUT, + EGO_VEHICLE_BLUEPRINT, ADV_VEHICLE_BLUEPRINT, + SIMULATION_FPS, FIXED_DELTA_SECONDS, DEFAULT_SPAWN_INDEX, + EPISODE_TIMEOUT, +) +from utils.data_saver import init_result_dir, save_vehicle_log +from utils.sensor_utils import CollisionSensor, DistanceMonitor + + +class BaseScenario: + """ + 场景基类。 + + 子类必须定义: + - name: 场景名称 + - category: 场景类别 + - weather: carla.WeatherParameters 实例 + - ego_speed_ms: 自车初始速度 (m/s) + + 子类可覆盖: + - _setup_world(): 设置天气、同步模式 + - _spawn_actors(): 生成场景专属参与者 + - _trigger_danger(): 触发危险事件 + - run(): 运行场景 + """ + + def __init__(self): + self.name = "BaseScenario" + self.category = "unknown" + + # 天气 —— 子类必须覆盖 + self.weather = carla.WeatherParameters.ClearNoon + + # 速度 —— 子类必须覆盖 + self.ego_speed_ms = 0.0 # m/s + + # CARLA 连接 + self.client = None + self.world = None + self.map = None + + # Actors + self.ego_vehicle = None + self.adv_vehicle = None + self.npc_vehicles = [] + self.pedestrians = [] + + # 传感器 + self.collision_sensor = None + self.distance_monitor = None + + # 数据 + self.logs = [] + self.result_dir = None + self._start_time = None + self._running = False + + # RL 环境引用(延迟创建) + self._env = None + + # ================================================================ + # 生命周期 + # ================================================================ + + def setup(self): + """连接 CARLA,设置世界,生成所有参与者""" + self._connect() + self._setup_world() + self._spawn_actors() + self._attach_sensors() + self._start_time = time.time() + self.result_dir = init_result_dir(self.name)["root"] + + def run(self): + """手动运行演示 —— 子类覆盖 _control_loop()""" + if self.ego_vehicle is None: + self.setup() + self._running = True + try: + self._control_loop() + finally: + self._running = False + + def cleanup(self): + """清理所有参与者 + 全量世界清理 + 恢复异步模式""" + self._running = False + for sensor in [self.collision_sensor]: + if sensor is not None: + try: sensor.destroy() + except Exception: pass + self.collision_sensor = None + self.distance_monitor = None + # 销毁已知 actor + for actor in self.npc_vehicles + self.pedestrians: + if isinstance(actor, (list, tuple)): + for a in actor: + if a is not None: + try: a.destroy() + except Exception: pass + elif actor is not None: + try: actor.destroy() + except Exception: pass + for v in [self.ego_vehicle, self.adv_vehicle]: + if v is not None: + try: v.destroy() + except Exception: pass + self.ego_vehicle = None + self.adv_vehicle = None + self.npc_vehicles.clear() + self.pedestrians.clear() + # 全量清理世界中所有残留 actor + if self.world is not None: + try: + for actor in self.world.get_actors(): + if actor is not None: + try: actor.destroy() + except RuntimeError: pass + except Exception: pass + # 恢复异步模式 + try: + settings = self.world.get_settings() + settings.synchronous_mode = False + settings.fixed_delta_seconds = None + self.world.apply_settings(settings) + except Exception: pass + if self.logs: + save_vehicle_log( + os.path.join(self.result_dir, "logs", "%s_trajectory.csv" % self.name), + self.logs) + + # ================================================================ + # 外部接口(供 experiments 使用) + # ================================================================ + + def get_env_config(self): + """返回供 CarlaEnv 使用的参数字典""" + return { + "weather": self.weather, + "ego_speed_ms": self.ego_speed_ms, + "max_steps": 500, + } + + def create_env(self): + """创建关联的 CarlaEnv 实例""" + from env.carla_env import CarlaEnv + self._env = CarlaEnv(scenario_params=self.get_env_config()) + return self._env + + def spawn_scenario_actors(self, env): + """ + RL 模式专用:在 env.reset() 之后生成场景专属参与者。 + env 已生成自车,此方法只生成 adv / 行人 / 障碍物等。 + 子类可覆盖 _spawn_scenario_actors_impl() 实现差异化。 + """ + self.ego_vehicle = env.ego_vehicle # 复用 env 的自车引用 + self.world = env.world + self._spawn_scenario_actors_impl() + # 让物理引擎和行人AI控制器初始化(关键:否则行人不会动) + for _ in range(10): + self.world.tick() + # 注册给 env + if self.adv_vehicle is not None: + env.register_adv_vehicle(self.adv_vehicle) + if self.pedestrians: + env.register_pedestrians(self.pedestrians) + if self.npc_vehicles: + env.register_npc_vehicles(self.npc_vehicles) + + def _spawn_scenario_actors_impl(self): + """子类覆盖:生成场景专属参与者(不含自车)""" + pass + + def step_callback(self, step_count): + """ + 每步回调(RL 训练循环中调用)。 + 子类覆盖以在特定时机触发场景事件(如延迟切入、急刹等)。 + """ + pass + + def inject_actors_to_env(self, env): + """将已生成的参与者注入 RL 环境(手动模式用)""" + if self.adv_vehicle is not None: + env.register_adv_vehicle(self.adv_vehicle) + if self.pedestrians: + env.register_pedestrians(self.pedestrians) + if self.npc_vehicles: + env.register_npc_vehicles(self.npc_vehicles) + + # ================================================================ + # 内部步骤 + # ================================================================ + + def _connect(self): + self.client = carla.Client(CARLA_HOST, CARLA_PORT) + self.client.set_timeout(CARLA_TIMEOUT) + self.world = self.client.get_world() + self.map = self.world.get_map() + + def _setup_world(self): + """设置天气和同步模式""" + self.world.set_weather(self.weather) + settings = self.world.get_settings() + settings.synchronous_mode = True + settings.fixed_delta_seconds = FIXED_DELTA_SECONDS + self.world.apply_settings(settings) + + def _spawn_actors(self): + """子类覆盖:生成该场景的专属参与者""" + self._spawn_ego() + + def _spawn_ego(self, spawn_index=None): + """生成自车(带重试回退)""" + bp_lib = self.world.get_blueprint_library() + spawn_points = self.map.get_spawn_points() + n = len(spawn_points) + ego_bp = bp_lib.find(EGO_VEHICLE_BLUEPRINT) or bp_lib.filter("vehicle.*")[0] + + # 尝试默认 spawn point 及附近点 + start_idx = (spawn_index if spawn_index is not None + else DEFAULT_SPAWN_INDEX) % n + for offset in range(n): + idx = (start_idx + offset) % n + self.ego_vehicle = self.world.try_spawn_actor(ego_bp, spawn_points[idx]) + if self.ego_vehicle is not None: + break + + if self.ego_vehicle is None: + raise RuntimeError( + "自车生成失败: 所有 %d 个 spawn point 均被占用,请重启 CARLA 或切换地图" % n) + + if self.ego_speed_ms > 0: + forward = self.ego_vehicle.get_transform().get_forward_vector() + self.ego_vehicle.set_target_velocity(carla.Vector3D( + float(forward.x * self.ego_speed_ms), + float(forward.y * self.ego_speed_ms), + 0.0)) + + def _spawn_adv_in_front(self, distance_m, speed_ms=None): + """在自车前方生成对抗车辆(多层回退保证成功)""" + self.world.tick() # 让 CARLA 注册自车位置 + ego_loc = self.ego_vehicle.get_location() + ego_rot = self.ego_vehicle.get_transform().rotation + bp_lib = self.world.get_blueprint_library() + adv_bp = bp_lib.find(ADV_VEHICLE_BLUEPRINT) or bp_lib.filter("vehicle.*")[1] + + # 计算自车前方方向向量 + forward = self.ego_vehicle.get_transform().get_forward_vector() + + # 策略1: 沿自车前方偏移 + for dist in [distance_m, distance_m + 5, distance_m + 10, distance_m + 15]: + for lat in [0.0, 1.5, -1.5, 3.0, -3.0]: + sp = carla.Transform( + carla.Location( + x=ego_loc.x + forward.x * dist + forward.y * lat, + y=ego_loc.y + forward.y * dist - forward.x * lat, + z=ego_loc.z + 0.5, + ), + ego_rot, + ) + self.adv_vehicle = self.world.try_spawn_actor(adv_bp, sp) + if self.adv_vehicle is not None: + break + if self.adv_vehicle is not None: + break + + # 策略2: 用地图 spawn points 中离自车前方最近的空闲点 + if self.adv_vehicle is None: + spawn_points = self.map.get_spawn_points() + best_sp = None + best_dist = float("inf") + for sp in spawn_points: + d = sp.location.distance(ego_loc) + dx = sp.location.x - ego_loc.x + dy = sp.location.y - ego_loc.y + # 判断在自车前方(点积 >0) + if forward.x * dx + forward.y * dy > 0 and d > 5: + if d < best_dist: + # 确认该点可用 + test = self.world.try_spawn_actor(adv_bp, sp) + if test is not None: + if best_sp is not None: + test.destroy() + best_sp = sp + best_dist = d + # 否则该点被占用,跳过 + + if best_sp is not None: + self.adv_vehicle = self.world.try_spawn_actor(adv_bp, best_sp) + + if self.adv_vehicle is None: + raise RuntimeError( + "对抗车辆生成失败: 地图=%s ego=(%.1f,%.1f) distance=%d" + % (self.map.name, ego_loc.x, ego_loc.y, distance_m)) + + if speed_ms is not None: + fwd = self.adv_vehicle.get_transform().get_forward_vector() + self.adv_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * speed_ms), float(fwd.y * speed_ms), 0.0)) + return self.adv_vehicle + + def _attach_sensors(self): + if self.ego_vehicle is not None: + self.collision_sensor = CollisionSensor(self.ego_vehicle) + + def _control_loop(self): + """默认手动控制循环""" + self.ego_vehicle.set_autopilot(True) + max_ticks = int(EPISODE_TIMEOUT * SIMULATION_FPS) + for tick in range(max_ticks): + if not self._running: + break + self.world.tick() + if tick % 10 == 0: + self._record_frame(tick) + if self.collision_sensor is not None and self.collision_sensor.collided: + self._record_frame(tick) + break + + def _record_frame(self, tick): + ego_loc = self.ego_vehicle.get_location() + vel = self.ego_vehicle.get_velocity() + ego_speed = (vel.x ** 2 + vel.y ** 2 + vel.z ** 2) ** 0.5 + self.logs.append([ + round(time.time() - self._start_time, 2), + round(ego_loc.x, 2), round(ego_loc.y, 2), round(ego_loc.z, 2), + round(ego_speed * 3.6, 2), + ]) diff --git a/critical/combined_fog_ghost.py b/critical/combined_fog_ghost.py new file mode 100644 index 00000000..95a553f8 --- /dev/null +++ b/critical/combined_fog_ghost.py @@ -0,0 +1,129 @@ +# 场景10: 雾天鬼探头 — 浓雾90% 货车相邻车道, 行人车头盲区冲出, 最高危 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import spawn_pedestrian_at + + +class FogGhostScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "combined_fog_ghost" + self.category = "multi_factor_coupled" + self.weather = carla.WeatherParameters( + cloudiness=90.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=10.0, fog_density=90.0, fog_distance=15.0, + wetness=20.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 25.0 / 3.6 + self._trigger_step = int(4.0 * 20) # 4s 行人冲出 + self._cross_time = int(2.0 * 20) # 横穿 2s + self._pause_time = int(1.0 * 20) # 车头前停 1s + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_obstacle_and_pedestrian() + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._phase = "idle" + self._phase_start = 0 + self._ped_start_y = None + self._spawn_obstacle_and_pedestrian() + + def _spawn_obstacle_and_pedestrian(self): + ego_tf = self.ego_vehicle.get_transform() + ego_loc = ego_tf.location + fwd = ego_tf.get_forward_vector() + right = ego_tf.get_right_vector() + bp_lib = self.world.get_blueprint_library() + + # 货车在相邻车道,自车前方 32m(同鬼探头场景) + truck_dist = 32.0 + truck_x = ego_loc.x + fwd.x * truck_dist + right.x * 3.5 + truck_y = ego_loc.y + fwd.y * truck_dist + right.y * 3.5 + truck_bp = bp_lib.filter("vehicle.carlamotors.carlacola")[0] \ + if bp_lib.filter("vehicle.carlamotors.carlacola") else bp_lib.filter("vehicle.*")[2] + truck_sp = carla.Transform( + carla.Location(x=truck_x, y=truck_y, z=ego_loc.z + 0.5), ego_tf.rotation) + obs = self.world.try_spawn_actor(truck_bp, truck_sp) + if obs is not None: + obs.apply_control(carla.VehicleControl(brake=1.0)) + self.npc_vehicles.append(obs) + + # 行人生成在货车车头前方 3m(雾+盲区双重隐蔽) + ped_x = ego_loc.x + fwd.x * (truck_dist + 3.0) + ped_y = ego_loc.y + right.y * 5.0 + self._ped_yaw = ego_tf.rotation.yaw - 90.0 + + ped_loc = carla.Location(x=ped_x, y=ped_y, z=ego_loc.z + 0.5) + ped, ctrl = spawn_pedestrian_at(self.world, ped_loc, speed_ms=0.0) + ped.set_transform(carla.Transform(ped_loc, carla.Rotation(yaw=self._ped_yaw))) + self.pedestrians.append((ped, ctrl)) + self._ped = ped + self._start_y = ped_y + self._lane_y = ego_loc.y + self._target_y = ego_loc.y - right.y * 5.0 + + # ================================================================ + # RL 回调 + # ================================================================ + def step_callback(self, step_count): + if self._ped is None: return + if step_count >= self._trigger_step and self._phase == "idle": + self._phase = "first_half"; self._phase_start = step_count + self._ped_start_y = self._ped.get_location().y + if self._phase == "first_half": + self._move_ped(step_count, self._ped_start_y, self._lane_y, self._cross_time, "paused") + elif self._phase == "paused": + if step_count - self._phase_start >= self._pause_time: + self._phase = "second_half"; self._phase_start = step_count + self._ped_start_y = self._ped.get_location().y + elif self._phase == "second_half": + self._move_ped(step_count, self._ped_start_y, self._target_y, self._cross_time, "done") + + def _move_ped(self, step_count, start_y, end_y, duration, next_phase): + elapsed = step_count - self._phase_start + if elapsed >= duration: + self._set_ped_y(end_y); self._phase = next_phase; self._phase_start = step_count + else: + self._set_ped_y(start_y + (end_y - start_y) * (elapsed / duration)) + + def _set_ped_y(self, y): + loc = self._ped.get_location(); loc.y = y + self._ped.set_transform(carla.Transform(loc, carla.Rotation(yaw=self._ped_yaw))) + + # ================================================================ + # 手动模式 + # ================================================================ + def _control_loop(self): + phase, phase_start = "idle", 0 + ped_start_y = None; ego_braked = False + for tick in range(int(60 * 20)): + if not self._running: break + self.ego_vehicle.apply_control( + carla.VehicleControl(throttle=0.0, brake=0.9, steer=0.0) + if ego_braked else carla.VehicleControl(throttle=0.15, steer=0.0)) + self.world.tick() + if tick >= self._trigger_step and phase == "idle" and self._ped: + phase, phase_start = "first_half", tick + ped_start_y = self._ped.get_location().y; ego_braked = True + if phase == "first_half" and self._ped: + e = tick - phase_start + if e >= self._cross_time: self._set_ped_y(self._lane_y); phase, phase_start = "paused", tick + else: self._set_ped_y(ped_start_y + (self._lane_y - ped_start_y) * (e / self._cross_time)) + elif phase == "paused": + if tick - phase_start >= self._pause_time: + phase, phase_start = "second_half", tick; ped_start_y = self._ped.get_location().y + elif phase == "second_half" and self._ped: + e = tick - phase_start + if e >= self._cross_time: self._set_ped_y(self._target_y); phase = "done" + else: self._set_ped_y(ped_start_y + (self._target_y - ped_start_y) * (e / self._cross_time)) + if tick % 5 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break + + def cleanup(self): + self._ped = None; super().cleanup() diff --git a/critical/combined_night_pedestrian.py b/critical/combined_night_pedestrian.py new file mode 100644 index 00000000..0aa8c0e6 --- /dev/null +++ b/critical/combined_night_pedestrian.py @@ -0,0 +1,52 @@ +# 场景9: 夜间行人横穿(耦合) — 晴天夜间亮度5% ego30 行人从人行道黑暗中横穿 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import spawn_pedestrian_at, walk_to_location, apply_brake + + +class NightPedestrianScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "combined_night_pedestrian" + self.category = "multi_factor_coupled" + self.weather = carla.WeatherParameters( + cloudiness=10.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=180.0, sun_altitude_angle=-90.0) + self.ego_speed_ms = 30.0 / 3.6 + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_pedestrian() + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._spawn_pedestrian() + + def _spawn_pedestrian(self): + ego_loc = self.ego_vehicle.get_location() + # 行人从右侧车旁人行道出现(黑暗中无灯光) + ped_loc = carla.Location(x=ego_loc.x + 20, y=ego_loc.y - 6.0, z=ego_loc.z) + ped, ctrl = spawn_pedestrian_at(self.world, ped_loc, speed_ms=1.5) + walk_to_location(ctrl, self.world, + carla.Location(x=ped_loc.x, y=ego_loc.y + 6.0, z=ped_loc.z)) + self.pedestrians.append((ped, ctrl)) + + def _control_loop(self): + braked = False + for tick in range(int(60 * 20)): + if not self._running: break + if braked: + self.ego_vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.8, steer=0.0)) + else: + self.ego_vehicle.apply_control(carla.VehicleControl(throttle=0.25, steer=0.0)) + self.world.tick() + if tick == int(3.0 * 20): + braked = True + if tick % 5 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break diff --git a/critical/cut_in_scenario.py b/critical/cut_in_scenario.py new file mode 100644 index 00000000..4f52ba0e --- /dev/null +++ b/critical/cut_in_scenario.py @@ -0,0 +1,138 @@ +# 场景5: 车辆强行加塞 — 旁车同行后直接位移切入自车车道 +import carla +from scenarios.base_scenario import BaseScenario + + +class CutInScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "cut_in" + self.category = "vehicle_adversarial" + self.weather = carla.WeatherParameters( + cloudiness=5.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 50.0 / 3.6 + self.adv_speed_ms = 55.0 / 3.6 + self._cut_trigger_step = int(4.0 * 20) # RL: 4s 后切入 + self._cut_duration_steps = int(1.5 * 20) # 1.5s 完成切入 + self._cut_triggered = False + self._cut_start_step = 0 + self._adv_start_y = None + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_adv_side() + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._cut_triggered = False + self._cut_start_step = 0 + self._adv_start_y = None + self._spawn_adv_side() + + def _spawn_adv_side(self): + self.world.tick() + ego_tf = self.ego_vehicle.get_transform() + ego_loc = ego_tf.location + fwd = ego_tf.get_forward_vector() + right = ego_tf.get_right_vector() + + bp_lib = self.world.get_blueprint_library() + adv_bp = bp_lib.find("vehicle.audi.a2") or bp_lib.filter("vehicle.*")[1] + + # 旁车在右侧车道,自车后方 15m + adv_loc = carla.Location( + x=ego_loc.x - fwd.x * 15 + right.x * 3.5, + y=ego_loc.y - fwd.y * 15 + right.y * 3.5, + z=ego_loc.z + 0.5) + adv_sp = carla.Transform(adv_loc, ego_tf.rotation) + self.adv_vehicle = self.world.try_spawn_actor(adv_bp, adv_sp) + if self.adv_vehicle is None: + raise RuntimeError("加塞车辆生成失败") + + # 先纯直行 + self.adv_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * self.adv_speed_ms), + float(fwd.y * self.adv_speed_ms), 0.0)) + + self._adv_start_y = adv_loc.y # 记录右侧车道 y 坐标 + self._ego_lane_y = ego_loc.y # 记录自车车道 y 坐标 + + # ================================================================ + # RL 模式回调:渐进位移旁车到自车车道 + # ================================================================ + def step_callback(self, step_count): + if self.adv_vehicle is None: + return + + if step_count >= self._cut_trigger_step and not self._cut_triggered: + self._cut_triggered = True + self._cut_start_step = step_count + self._adv_start_y = self.adv_vehicle.get_location().y + self._ego_lane_y = self.ego_vehicle.get_location().y + + if self._cut_triggered: + elapsed = step_count - self._cut_start_step + if elapsed <= self._cut_duration_steps: + # 线性插值:从右车道 y 平滑过渡到自车车道 y + t = elapsed / self._cut_duration_steps + target_y = self._adv_start_y + (self._ego_lane_y - self._adv_start_y) * t + loc = self.adv_vehicle.get_location() + loc.y = target_y + self.adv_vehicle.set_location(loc) + + # ================================================================ + # 手动模式 + # ================================================================ + def _control_loop(self): + cut_start = int(4.0 * 20) # 4s 后切入 + cut_dur = int(1.5 * 20) + ego_braked = False + adv_y0 = None + ego_y0 = None + + for tick in range(int(60 * 20)): + if not self._running: + break + + if ego_braked: + self.ego_vehicle.apply_control( + carla.VehicleControl(throttle=0.0, brake=0.8, steer=0.0)) + else: + self.ego_vehicle.apply_control( + carla.VehicleControl(throttle=0.4, steer=0.0)) + + self.world.tick() + + # 2s 后旁车开始位移切入自车车道 + if tick == cut_start and self.adv_vehicle: + adv_y0 = self.adv_vehicle.get_location().y + ego_y0 = self.ego_vehicle.get_location().y + + if adv_y0 is not None and tick < cut_start + cut_dur: + elapsed = tick - cut_start + t = elapsed / cut_dur + target_y = adv_y0 + (ego_y0 - adv_y0) * t + loc = self.adv_vehicle.get_location() + loc.y = target_y + self.adv_vehicle.set_location(loc) + # 同时打方向产生视觉效果 + self.adv_vehicle.apply_control( + carla.VehicleControl(throttle=0.7, steer=-0.6)) + + # 切入完成后回正 + 自车急刹 + if adv_y0 is not None and tick >= cut_start + cut_dur and not ego_braked: + self.adv_vehicle.apply_control( + carla.VehicleControl(throttle=0.4, steer=0.0)) + ego_braked = True + + if tick % 5 == 0: + self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: + break diff --git a/critical/emergency_brake.py b/critical/emergency_brake.py new file mode 100644 index 00000000..94c3f1d4 --- /dev/null +++ b/critical/emergency_brake.py @@ -0,0 +1,54 @@ +# 场景4: 前车紧急制动 — 晴天白天 ego55 adv50 2s后adv急刹-8m/s² 距离10m +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import apply_brake + + +class EmergencyBrakeScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "emergency_brake" + self.category = "vehicle_adversarial" + self.weather = carla.WeatherParameters( + cloudiness=5.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 55.0 / 3.6 # 55 km/h + self.adv_speed_ms = 50.0 / 3.6 # 50 km/h + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_adv_in_front(10.0, self.adv_speed_ms) # 间距仅 10m + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._brake_done = False # 每 episode 重置 + self._spawn_adv_in_front(10.0, self.adv_speed_ms) + + def step_callback(self, step_count): + """RL: 2s 后前车急刹""" + if step_count >= 40 and not self._brake_done: # 40步 = 2s @20fps + apply_brake(self.adv_vehicle, 1.0) + self._brake_done = True + + def _control_loop(self): + brake_tick = int(2.0 * 20); done_brake = False + for tick in range(int(60 * 20)): + if not self._running: break + if done_brake: + self.ego_vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.9, steer=0.0)) + else: + self.ego_vehicle.apply_control(carla.VehicleControl(throttle=0.4, steer=0.0)) + self.world.tick() + if tick >= brake_tick and not done_brake: + apply_brake(self.adv_vehicle, 1.0); done_brake = True + if tick % 5 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break + if done_brake and self.adv_vehicle: + v = self.adv_vehicle.get_velocity() + if (v.x**2+v.y**2)**0.5 < 0.1 and tick > brake_tick + 60: break diff --git a/critical/ghost_peek.py b/critical/ghost_peek.py new file mode 100644 index 00000000..37d995c2 --- /dev/null +++ b/critical/ghost_peek.py @@ -0,0 +1,153 @@ +# 场景7: 鬼探头 — 货车停在相邻车道前方, 行人从货车车头冲出, 自车急刹 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import spawn_pedestrian_at + + +class GhostPeekScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "ghost_peek" + self.category = "pedestrian_danger" + self.weather = carla.WeatherParameters( + cloudiness=10.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 40.0 / 3.6 + self._trigger_step = int(2.3 * 20) # 2.3s 行人冲出(确保自车刹停在行人前) + self._cross_time = int(2.0 * 20) # 横穿 2s + self._pause_time = int(1.0 * 20) # 车头前停 1s + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_obstacle_and_pedestrian() + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._phase = "idle" + self._phase_start = 0 + self._ped_start_y = None + self._spawn_obstacle_and_pedestrian() + + def _spawn_obstacle_and_pedestrian(self): + ego_tf = self.ego_vehicle.get_transform() + ego_loc = ego_tf.location + fwd = ego_tf.get_forward_vector() + right = ego_tf.get_right_vector() + bp_lib = self.world.get_blueprint_library() + + # 货车在相邻车道,自车前方 32m + truck_dist = 32.0 + truck_x = ego_loc.x + fwd.x * truck_dist + right.x * 3.5 + truck_y = ego_loc.y + fwd.y * truck_dist + right.y * 3.5 + truck_bp = bp_lib.filter("vehicle.carlamotors.carlacola")[0] \ + if bp_lib.filter("vehicle.carlamotors.carlacola") else bp_lib.filter("vehicle.*")[2] + truck_sp = carla.Transform( + carla.Location(x=truck_x, y=truck_y, z=ego_loc.z + 0.5), + ego_tf.rotation) + obs = self.world.try_spawn_actor(truck_bp, truck_sp) + if obs is not None: + obs.apply_control(carla.VehicleControl(brake=1.0)) + self.npc_vehicles.append(obs) + + # 行人生成在货车车头前方 3m(盲区死角) + ped_x = ego_loc.x + fwd.x * (truck_dist + 3.0) + ped_y = ego_loc.y + right.y * 5.0 + self._ped_yaw = ego_tf.rotation.yaw - 90.0 + + ped_loc = carla.Location(x=ped_x, y=ped_y, z=ego_loc.z + 0.5) + ped, ctrl = spawn_pedestrian_at(self.world, ped_loc, speed_ms=0.0) + ped.set_transform(carla.Transform(ped_loc, carla.Rotation(yaw=self._ped_yaw))) + self.pedestrians.append((ped, ctrl)) + self._ped = ped + self._start_y = ped_y + self._lane_y = ego_loc.y # 自车车道(停留点) + self._target_y = ego_loc.y - right.y * 5.0 # 对侧人行道 + + # ================================================================ + # RL 回调 + # ================================================================ + def step_callback(self, step_count): + if self._ped is None: + return + + if step_count >= self._trigger_step and self._phase == "idle": + self._phase = "first_half" + self._phase_start = step_count + self._ped_start_y = self._ped.get_location().y + + if self._phase == "first_half": + self._move_ped(step_count, self._ped_start_y, self._lane_y, + self._cross_time, "paused") + elif self._phase == "paused": + if step_count - self._phase_start >= self._pause_time: + self._phase = "second_half" + self._phase_start = step_count + self._ped_start_y = self._ped.get_location().y + elif self._phase == "second_half": + self._move_ped(step_count, self._ped_start_y, self._target_y, + self._cross_time, "done") + + def _move_ped(self, step_count, start_y, end_y, duration, next_phase): + elapsed = step_count - self._phase_start + if elapsed >= duration: + self._set_ped_y(end_y) + self._phase = next_phase + self._phase_start = step_count + else: + self._set_ped_y(start_y + (end_y - start_y) * (elapsed / duration)) + + def _set_ped_y(self, y): + loc = self._ped.get_location() + loc.y = y + self._ped.set_transform(carla.Transform(loc, carla.Rotation(yaw=self._ped_yaw))) + + # ================================================================ + # 手动模式 + # ================================================================ + def _control_loop(self): + phase, phase_start = "idle", 0 + ped_start_y = None + ego_braked = False + + for tick in range(int(60 * 20)): + if not self._running: break + + self.ego_vehicle.apply_control( + carla.VehicleControl(throttle=0.0, brake=0.9, steer=0.0) + if ego_braked else carla.VehicleControl(throttle=0.3, steer=0.0)) + self.world.tick() + + if tick >= self._trigger_step and phase == "idle" and self._ped: + phase, phase_start = "first_half", tick + ped_start_y = self._ped.get_location().y + ego_braked = True + + if phase == "first_half" and self._ped: + e = tick - phase_start + if e >= self._cross_time: + self._set_ped_y(self._lane_y); phase, phase_start = "paused", tick + else: + self._set_ped_y(ped_start_y + (self._lane_y - ped_start_y) * (e / self._cross_time)) + elif phase == "paused": + if tick - phase_start >= self._pause_time: + phase, phase_start = "second_half", tick + ped_start_y = self._ped.get_location().y + elif phase == "second_half" and self._ped: + e = tick - phase_start + if e >= self._cross_time: + self._set_ped_y(self._target_y); phase = "done" + else: + self._set_ped_y(ped_start_y + (self._target_y - ped_start_y) * (e / self._cross_time)) + + if tick % 5 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break + + def cleanup(self): + self._ped = None + super().cleanup() diff --git a/critical/heavy_fog.py b/critical/heavy_fog.py new file mode 100644 index 00000000..8c000c8e --- /dev/null +++ b/critical/heavy_fog.py @@ -0,0 +1,58 @@ +# 场景2: 浓雾巡航 — 雾100% 能见度20m 自车低速跟车 前车偶有减速 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import apply_brake + + +class HeavyFogScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "heavy_fog" + self.category = "extreme_weather" + self.weather = carla.WeatherParameters( + cloudiness=80.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=10.0, fog_density=100.0, fog_distance=20.0, + wetness=0.0, sun_azimuth_angle=90.0, sun_altitude_angle=55.0) + self.ego_speed_ms = 28.0 / 3.6 + self.adv_speed_ms = 25.0 / 3.6 + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + cfg["brake_mode"] = "coast" # 松油门滑行,永不刹停 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_adv_in_front(30.0, self.adv_speed_ms) + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._spawn_adv_in_front(30.0, self.adv_speed_ms) + + def step_callback(self, step_count): + """每 4 秒前车轻微减速,测试雾天跟车反应""" + if self.adv_vehicle is None: + return + cycle = step_count % 80 # 4s 周期 + if cycle == 0: + fwd = self.adv_vehicle.get_transform().get_forward_vector() + self.adv_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * 22.0 / 3.6), float(fwd.y * 22.0 / 3.6), 0.0)) + elif cycle == 25: + fwd = self.adv_vehicle.get_transform().get_forward_vector() + self.adv_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * self.adv_speed_ms), + float(fwd.y * self.adv_speed_ms), 0.0)) + + def _control_loop(self): + for tick in range(int(60 * 20)): + if not self._running: break + fwd = self.ego_vehicle.get_transform().get_forward_vector() + self.ego_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * self.ego_speed_ms), float(fwd.y * self.ego_speed_ms), 0.0)) + self.world.tick() + if tick % 80 == 0 and self.adv_vehicle: + apply_brake(self.adv_vehicle, 0.12) + if tick % 10 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break diff --git a/critical/jaywalking.py b/critical/jaywalking.py new file mode 100644 index 00000000..2cc9aabb --- /dev/null +++ b/critical/jaywalking.py @@ -0,0 +1,53 @@ +# 场景8: 行人闯红灯 — 晴天白天 ego40绿灯 行人从红绿灯旁人行道闯红灯横穿 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import spawn_pedestrian_at, walk_to_location, apply_brake + + +class JaywalkingScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "jaywalking" + self.category = "pedestrian_danger" + self.weather = carla.WeatherParameters( + cloudiness=5.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 40.0 / 3.6 + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_pedestrian() + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._spawn_pedestrian() + + def _spawn_pedestrian(self): + ego_loc = self.ego_vehicle.get_location() + # 行人从红绿灯旁左侧人行道闯出 + ped_loc = carla.Location(x=ego_loc.x + 18, y=ego_loc.y + 6.0, z=ego_loc.z) + ped, ctrl = spawn_pedestrian_at(self.world, ped_loc, speed_ms=1.8) + # 闯红灯横穿到对面 + walk_to_location(ctrl, self.world, + carla.Location(x=ped_loc.x, y=ego_loc.y - 6.0, z=ped_loc.z)) + self.pedestrians.append((ped, ctrl)) + + def _control_loop(self): + braked = False + for tick in range(int(60 * 20)): + if not self._running: break + if braked: + self.ego_vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.8, steer=0.0)) + else: + self.ego_vehicle.apply_control(carla.VehicleControl(throttle=0.3, steer=0.0)) + self.world.tick() + if tick == int(2.5 * 20): + braked = True + if tick % 5 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break diff --git a/critical/pedestrian_cross.py b/critical/pedestrian_cross.py new file mode 100644 index 00000000..b0c5c5ed --- /dev/null +++ b/critical/pedestrian_cross.py @@ -0,0 +1,152 @@ +# 场景6: 行人横穿 — 慢速横穿, 在车头前停留1s, 自车急刹同时行人暂停 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import spawn_pedestrian_at + + +class PedestrianCrossScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "pedestrian_cross" + self.category = "pedestrian_danger" + self.weather = carla.WeatherParameters( + cloudiness=5.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 30.0 / 3.6 + self._trigger_step = int(2.5 * 20) # 2.5s 开始横穿 + self._half_cross_time = int(2.0 * 20) # 半程 2s(慢速) + self._pause_time = int(1.0 * 20) # 车头前停留 1s + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_pedestrian() + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._phase = "idle" # idle → first_half → paused → second_half → done + self._phase_start = 0 + self._ped_start_y = None + self._spawn_pedestrian() + + def _spawn_pedestrian(self): + ego_tf = self.ego_vehicle.get_transform() + ego_loc = ego_tf.location + fwd = ego_tf.get_forward_vector() + right = ego_tf.get_right_vector() + + ped_x = ego_loc.x + fwd.x * 30 + right.x * 6.0 + ped_y = ego_loc.y + fwd.y * 30 + right.y * 6.0 + self._ped_yaw = ego_tf.rotation.yaw - 90.0 + + ped_loc = carla.Location(x=ped_x, y=ped_y, z=ego_loc.z + 0.5) + ped, ctrl = spawn_pedestrian_at(self.world, ped_loc, speed_ms=0.0) + ped.set_transform(carla.Transform(ped_loc, carla.Rotation(yaw=self._ped_yaw))) + self.pedestrians.append((ped, ctrl)) + self._ped = ped + self._start_y = ped_y # 右侧人行道 + self._lane_y = ego_loc.y # 自车车道中心(停留点) + self._target_y = ego_loc.y - right.y * 6.0 # 对侧人行道 + + # ================================================================ + # RL 回调 + # ================================================================ + def step_callback(self, step_count): + if self._ped is None: + return + + # 触发横穿 + if step_count >= self._trigger_step and self._phase == "idle": + self._phase = "first_half" + self._phase_start = step_count + self._ped_start_y = self._ped.get_location().y + + if self._phase == "first_half": + self._move_ped(step_count, self._ped_start_y, self._lane_y, + self._half_cross_time, "paused") + + elif self._phase == "paused": + if step_count - self._phase_start >= self._pause_time: + self._phase = "second_half" + self._phase_start = step_count + self._ped_start_y = self._ped.get_location().y + + elif self._phase == "second_half": + self._move_ped(step_count, self._ped_start_y, self._target_y, + self._half_cross_time, "done") + + def _move_ped(self, step_count, start_y, end_y, duration, next_phase): + """线性位移行人 + 到达后切状态""" + elapsed = step_count - self._phase_start + if elapsed >= duration: + self._set_ped_y(end_y) + self._phase = next_phase + self._phase_start = step_count + else: + t = elapsed / duration + self._set_ped_y(start_y + (end_y - start_y) * t) + + def _set_ped_y(self, y): + loc = self._ped.get_location() + loc.y = y + self._ped.set_transform(carla.Transform(loc, carla.Rotation(yaw=self._ped_yaw))) + + # ================================================================ + # 手动模式 + # ================================================================ + def _control_loop(self): + phase, phase_start = "idle", 0 + ped_start_y = None + ego_braked = False + + for tick in range(int(60 * 20)): + if not self._running: break + + self.ego_vehicle.apply_control( + carla.VehicleControl(throttle=0.0, brake=0.8, steer=0.0) + if ego_braked else carla.VehicleControl(throttle=0.25, steer=0.0)) + self.world.tick() + + # 触发 + if tick >= self._trigger_step and phase == "idle" and self._ped: + phase, phase_start = "first_half", tick + ped_start_y = self._ped.get_location().y + ego_braked = True # 自车发现行人,立即急刹 + + # 前半程 → 车头前暂停 + if phase == "first_half" and self._ped: + elapsed = tick - phase_start + if elapsed >= self._half_cross_time: + self._set_ped_y(self._lane_y) + phase, phase_start = "paused", tick + else: + t = elapsed / self._half_cross_time + self._set_ped_y(ped_start_y + (self._lane_y - ped_start_y) * t) + + # 停留 1s + elif phase == "paused": + if tick - phase_start >= self._pause_time: + phase, phase_start = "second_half", tick + ped_start_y = self._ped.get_location().y + + # 后半程 → 对侧 + elif phase == "second_half" and self._ped: + elapsed = tick - phase_start + if elapsed >= self._half_cross_time: + self._set_ped_y(self._target_y) + phase = "done" + else: + t = elapsed / self._half_cross_time + self._set_ped_y(ped_start_y + (self._target_y - ped_start_y) * t) + + if tick % 5 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break + + def cleanup(self): + self._ped = None + super().cleanup() diff --git a/critical/rain_storm.py b/critical/rain_storm.py new file mode 100644 index 00000000..6974a1bd --- /dev/null +++ b/critical/rain_storm.py @@ -0,0 +1,68 @@ +# 场景1: 暴雨跟车 — 雨90% 积水60% 自车匀速跟车 前车偶有减速 +import carla +from scenarios.base_scenario import BaseScenario +from utils.carla_utils import apply_brake + + +class RainStormScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "rain_storm" + self.category = "extreme_weather" + self.weather = carla.WeatherParameters( + cloudiness=95.0, precipitation=90.0, precipitation_deposits=60.0, + wind_intensity=30.0, fog_density=20.0, fog_distance=100.0, + wetness=60.0, sun_azimuth_angle=90.0, sun_altitude_angle=45.0) + self.ego_speed_ms = 45.0 / 3.6 + self.adv_speed_ms = 40.0 / 3.6 + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + cfg["brake_mode"] = "coast" # 松油门滑行,永不刹停 + return cfg + + def _spawn_actors(self): + self._spawn_ego() + self._spawn_adv_in_front(20.0, self.adv_speed_ms) + self.world.tick() + + def _spawn_scenario_actors_impl(self): + self._spawn_adv_in_front(20.0, self.adv_speed_ms) + + # ================================================================ + # RL 回调:前车周期性减速,测试自车跟车反应 + # ================================================================ + def step_callback(self, step_count): + """每 ~3 秒前车轻微减速一次,自车需感知并调整距离""" + if self.adv_vehicle is None: + return + cycle = step_count % 60 # 3s 周期 (60 步 @20fps) + if cycle == 0: + # 前车减速到 35 km/h + fwd = self.adv_vehicle.get_transform().get_forward_vector() + self.adv_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * 35.0 / 3.6), float(fwd.y * 35.0 / 3.6), 0.0)) + elif cycle == 20: + # 恢复到 40 km/h + fwd = self.adv_vehicle.get_transform().get_forward_vector() + self.adv_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * self.adv_speed_ms), + float(fwd.y * self.adv_speed_ms), 0.0)) + + # ================================================================ + # 手动模式 + # ================================================================ + def _control_loop(self): + for tick in range(int(60 * 20)): + if not self._running: break + fwd = self.ego_vehicle.get_transform().get_forward_vector() + self.ego_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * self.ego_speed_ms), + float(fwd.y * self.ego_speed_ms), 0.0)) + self.world.tick() + # 前车周期性轻微减速 + if tick % 60 == 0 and self.adv_vehicle: + apply_brake(self.adv_vehicle, 0.15) + if tick % 10 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break diff --git a/critical/tunnel_night.py b/critical/tunnel_night.py new file mode 100644 index 00000000..6e73a7ed --- /dev/null +++ b/critical/tunnel_night.py @@ -0,0 +1,31 @@ +# 场景3: 夜间黑暗行驶 — 晴天 夜间亮度5% ego40 无其他车辆 无行人 +import carla +from scenarios.base_scenario import BaseScenario + + +class TunnelNightScenario(BaseScenario): + def __init__(self): + super().__init__() + self.name = "tunnel_night" + self.category = "extreme_weather" + self.weather = carla.WeatherParameters( + cloudiness=10.0, precipitation=0.0, precipitation_deposits=0.0, + wind_intensity=0.0, fog_density=0.0, fog_distance=0.0, + wetness=0.0, sun_azimuth_angle=180.0, sun_altitude_angle=-90.0) + self.ego_speed_ms = 40.0 / 3.6 + + def get_env_config(self): + cfg = super().get_env_config() + cfg["action_space"] = 2 + cfg["brake_mode"] = "coast" + return cfg + + def _control_loop(self): + for tick in range(int(60 * 20)): + if not self._running: break + fwd = self.ego_vehicle.get_transform().get_forward_vector() + self.ego_vehicle.set_target_velocity(carla.Vector3D( + float(fwd.x * self.ego_speed_ms), float(fwd.y * self.ego_speed_ms), 0.0)) + self.world.tick() + if tick % 10 == 0: self._record_frame(tick) + if self.collision_sensor and self.collision_sensor.collided: break