In [None]:
import math
import random
import sys
import weakref
import argparse

In [None]:
import carla

In [None]:
NOISE_STDDEV = 5e-5
NOISE_BIAS = 1e-5

In [None]:
class GnssSensor:
    def __init__(
        self,
        parent_actor,
        transform,
        sensor_tick=0.1,
        noise_alt_bias=None,
        noise_alt_stddev=None,
        noise_lat_bias=None,
        noise_lat_stddev=None,
        noise_lon_bias=None,
        noise_lon_stddev=None
    ):
        self._parent = parent_actor
        self.lat = 0.0
        self.lon = 0.0
        world = self._parent.get_world()
        bp = world.get_blueprint_library().find("sensor.other.gnss")
        if sensor_tick: # 传感器捕获频率
            bp.set_attribute("sensor_tick", sensor_tick)
        if noise_alt_bias:  # 海拔噪声模型中的均值
            bp.set_attribute("noise_alt_bias", noise_alt_bias)
        if noise_alt_stddev:    # 海拔噪声模型中的标准偏差
            bp.set_attribute("noise_alt_stddev", noise_alt_stddev)
        if noise_lat_bias:  # 纬度噪声模型中的均值
            bp.set_attribute("noise_lat_bias", noise_lat_bias)
        if noise_lat_stddev:    # 纬度噪声模型中的标准偏差
            bp.set_attribute("noise_lat_stddev", noise_lat_stddev)
        if noise_lon_bias:  # 经度噪声模型中的均值
            bp.set_attribute("noise_lon_bias", noise_lon_bias)
        if noise_lon_stddev:    # 经度噪声模型中的标准偏差
            bp.set_attribute("noise_lon_stddev", noise_lon_stddev)
        self.sensor = world.spawn_actor(blueprint=bp, transform=transform, attach_to=self._parent)
        weak_self = weakref.ref(self)
        self.sensor.listen(lambda event: GnssSensor._gnss_callback(weak_self, event))

    @staticmethod
    def _gnss_callback(weak_self, event):
        self = weak_self()
        if not self:
            return
        self.lat = event.latitude
        self.lon = event.longitude

In [None]:
class IMUSensor:
    def __init__(
        self,
        parent_actor,
        transform,
        sensor_tick=0.01,
        noise_accel_stddev_x=None,
        noise_accel_stddev_y=None,
        noise_accel_stddev_z=None,
        noise_gyro_bias_x=None,
        noise_gyro_bias_y=None,
        noise_gyro_bias_z=None,
        noise_gyro_stddev_x=None,
        noise_gyro_stddev_y=None,
        noise_gyro_stddev_z=None
    ):
        self._parent = parent_actor
        self.accelerometer = (0.0, 0.0, 0.0)
        self.gyroscope = (0.0, 0.0, 0.0)
        self.compass = 0.0
        world = self._parent.get_world()
        bp = world.get_blueprint_library().find("sensor.other.imu")
        if sensor_tick: # 传感器捕获频率
            bp.set_attribute("sensor_tick", sensor_tick)
        if noise_accel_stddev_x:    # 加速度噪声模型中的标准差（X轴）
            bp.set_attribute("noise_accel_stddev_x", noise_accel_stddev_x)
        if noise_accel_stddev_y:    # 加速度噪声模型中的标准差（Y轴）
            bp.set_attribute("noise_accel_stddev_y", noise_accel_stddev_y)
        if noise_accel_stddev_z:    # 加速度噪声模型中的标准差（Z轴）
            bp.set_attribute("noise_accel_stddev_z", noise_accel_stddev_z)
        if noise_gyro_bias_x:   # 陀螺仪噪声模型中的均值（X轴）
            bp.set_attribute("noise_gyro_bias_x", noise_gyro_bias_x)
        if noise_gyro_bias_y:   # 陀螺仪噪声模型中的均值（Y轴）
            bp.set_attribute("noise_gyro_bias_y", noise_gyro_bias_y)
        if noise_gyro_bias_z:   # 陀螺仪噪声模型中的均值（Z轴）
            bp.set_attribute("noise_gyro_bias_z", noise_gyro_bias_z)
        if noise_gyro_stddev_x: # 陀螺仪噪声模型中的标准差（X轴）
            bp.set_attribute("noise_gyro_stddev_x", noise_gyro_stddev_x)
        if noise_gyro_stddev_y: # 陀螺仪噪声模型中的标准差（Y轴）
            bp.set_attribute("noise_gyro_stddev_y", noise_gyro_stddev_y)
        if noise_gyro_stddev_z: # 陀螺仪噪声模型中的标准差（Z轴）
            bp.set_attribute("noise_gyro_stddev_z", noise_gyro_stddev_z)
        self.sensor = world.spawn_actor(blueprint=bp, transform=transform, attach_to=self._parent)
        weak_self = weakref.ref(self)
        self.sensor.listen(lambda sensor_data: IMUSensor._imu_callback(weak_self, sensor_data))

    @staticmethod
    def _imu_callback(weak_self, sensor_data):
        self = weak_self()
        if not self:
            return
        limits = (-99.9, 99.9)
        self.accelerometer = (
            max(limits[0], min(limits[1], sensor_data.accelerometer.x)),
            max(limits[0], min(limits[1], sensor_data.accelerometer.y)),
            max(limits[0], min(limits[1], sensor_data.accelerometer.z)))
        self.gyroscope = (
            max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.x))),
            max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.y))),
            max(limits[0], min(limits[1], math.degrees(sensor_data.gyroscope.z))))
        self.compass = math.degrees(sensor_data.compass)

In [None]:
class Vehicle:
    def __init__(self, world, spawn_point, args):
        self.world = world
        # 初始化车辆
        bp_lib = world.get_blueprint_library()
        vehicle_bp = bp_lib.filter("model3")[0]
        self.vehicle = world.spawn_actor(vehicle_bp, spawn_point)
        self.vehicle.set_autopilot(args.autopilot)
        # 添加 GNSS 和 IMU
        self.gnss = GnssSensor(parent_actor=self.vehicle, transform=carla.Location(x=0, z=0))
        self.imu = IMUSensor(parent_actor=self.vehicle, transform=carla.Location(x=0, z=0))
        self.actor_list = [self.vehicle, self.gnss.sensor, self.imu.sensor]

In [None]:
class World:
    def __init__(self, carla_world, args):
        self.world = carla_world
        self.sync = args.sync
        self.vehicle = None
        try:
            self.map = self.world.get_map()
        except RuntimeError as error:
            print(f"RuntimeError: {error}")
            print("  The server could not send the OpenDRIVE (.xodr) file:")
            print("  Make sure it exists, has the same name of your town, and is correct.")
            sys.exit(1)
        self.restart()

    def restart(self):
        while self.vehicle is None:
            if not self.map.get_spawn_points():
                print("There are no spawn points available in your map/town.")
                print("Please add some Vehicle Spawn Point to your UE4 scene.")
                sys.exit(1)
            spawn_points = self.map.get_spawn_points()
            spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform()
            self.vehicle = Vehicle(self.world, spawn_point, args)

        if self.sync:
            self.world.tick()
        else:
            self.world.wait_for_tick()

In [None]:
def game_loop(args):
    sim_world, world, client, original_settings = None, None, None, None

    try:
        # 客户端连接 Carla
        client = carla.Client(args.host, args.port)
        client.set_timeout(20.0)
        # 初始化 World
        sim_world = client.get_world()
        if args.sync:
            original_settings = sim_world.get_settings()
            settings = sim_world.get_settings()
            if not settings.synchronous_mode:
                settings.synchronous_mode = True
                settings.fixed_delta_seconds = 0.05
            sim_world.apply_settings(settings)

            traffic_manager = client.get_trafficmanager()
            traffic_manager.set_synchronous_mode(True)
        world = World(sim_world, args)
        if args.sync:
            sim_world.tick()
        else:
            sim_world.wait_for_tick()

        while True:
            if args.sync:
                sim_world.tick()
    finally:
        if original_settings:
            sim_world.apply_settings(original_settings)
        if world and world.recording_enabled:
            client.stop_recorder()
        if world is not None:
            world.destroy()

In [None]:
def main():
    arg_parser = argparse.ArgumentParser(description="CARLA Client")
    arg_parser.add_argument("--host", default="127.0.0.1", help="IP of the host server (default: 127.0.0.1)")
    arg_parser.add_argument("--port", default=2000, type=int, help="TCP port to listen to (default: 2000)")
    arg_parser.add_argument("--sync", action="store_true", help="Activate synchronous mode execution")
    arg_parser.add_argument("--autopilot", action="store_true", help="enable autopilot")
    args = arg_parser.parse_args()
    try:
        game_loop(args)
    except KeyboardInterrupt:
        print("\nCancelled by user. Bye!")