# CARLA Python API - Lab4

Lab goals:
- build engaging sensor-based mini projects;
- keep scripts robust with explicit cleanup.

## CARLA documentation links
- Main docs: https://carla.readthedocs.io/en/latest/
- Sensors: https://carla.readthedocs.io/en/latest/ref_sensors/
- Python API: https://carla.readthedocs.io/en/latest/python_api/
- Debug helper: https://carla.readthedocs.io/en/latest/python_api/#carladebughelper
- Vehicle lights: https://carla.readthedocs.io/en/latest/python_api/#carlavehiclelightstate


In [None]:
import carla, time, random, cv2
import numpy as np


In [None]:
client = carla.Client("localhost", 2000)
client.set_timeout(10.0)
world = client.get_world()
spectator = world.get_spectator()


## 1. Shared helper functions


In [None]:
def move_spectator_to(transform, spectator, distance=7.0, z=3.0, pitch=-15.0):
    back = transform.location - transform.get_forward_vector() * distance
    loc = carla.Location(back.x, back.y, back.z + z)
    rot = carla.Rotation(pitch=pitch, yaw=transform.rotation.yaw, roll=0.0)
    spectator.set_transform(carla.Transform(loc, rot))


def spawn_vehicle(world, spawn_index=0, vehicle_filter="vehicle.tesla.model3", autopilot=False):
    points = world.get_map().get_spawn_points()
    if not points:
        raise RuntimeError("No spawn points found")
    bps = world.get_blueprint_library().filter(vehicle_filter)
    if not bps:
        bps = world.get_blueprint_library().filter("vehicle.*")
    for k in range(len(points)):
        actor = world.try_spawn_actor(random.choice(bps), points[(spawn_index + k) % len(points)])
        if actor is not None:
            actor.set_autopilot(autopilot)
            return actor
    raise RuntimeError("Could not spawn vehicle")


def spawn_vehicle_ahead(world, ref_vehicle, distances=(20.0, 28.0, 36.0)):
    wp = world.get_map().get_waypoint(ref_vehicle.get_transform().location, project_to_road=True, lane_type=carla.LaneType.Driving)
    bps = world.get_blueprint_library().filter("vehicle.*")
    for d in distances:
        cands = wp.next(float(d))
        random.shuffle(cands)
        for c in cands:
            actor = world.try_spawn_actor(random.choice(bps), c.transform)
            if actor is not None:
                return actor
    raise RuntimeError("Could not spawn target ahead")


def spawn_camera(world, attach_to, transform, width=640, height=360, fov=95, tick=0.05):
    bp = world.get_blueprint_library().find("sensor.camera.rgb")
    bp.set_attribute("image_size_x", str(width))
    bp.set_attribute("image_size_y", str(height))
    bp.set_attribute("fov", str(fov))
    bp.set_attribute("sensor_tick", str(tick))
    return world.spawn_actor(bp, transform, attach_to=attach_to)


def spawn_lidar(world, attach_to, transform):
    bp = world.get_blueprint_library().find("sensor.lidar.ray_cast")
    bp.set_attribute("channels", "32")
    bp.set_attribute("points_per_second", "56000")
    bp.set_attribute("rotation_frequency", "20")
    bp.set_attribute("range", "35")
    return world.spawn_actor(bp, transform, attach_to=attach_to)


def image_to_bgr(image):
    arr = np.frombuffer(image.raw_data, dtype=np.uint8)
    arr = np.reshape(arr, (image.height, image.width, 4))
    return arr[:, :, :3][:, :, ::-1].copy()


def safe_destroy(actors):
    for a in actors:
        if a is not None:
            try:
                a.destroy()
            except RuntimeError:
                pass


## 2. Exercises with solutions


### Exercise 1: Streamer cockpit (4 cameras)
Docs: https://carla.readthedocs.io/en/latest/ref_sensors/#rgb-camera

#### Solution


In [None]:
vehicle = None
cameras = []
frames = {}
res = (480, 270)

specs = {
    "front": carla.Transform(carla.Location(x=1.8, z=1.4), carla.Rotation(pitch=-6.0)),
    "left": carla.Transform(carla.Location(x=0.1, y=-0.75, z=1.3), carla.Rotation(yaw=-145.0)),
    "right": carla.Transform(carla.Location(x=0.1, y=0.75, z=1.3), carla.Rotation(yaw=145.0)),
    "rear": carla.Transform(carla.Location(x=-2.1, z=1.3), carla.Rotation(yaw=180.0)),
}


def make_cb(name):
    def _cb(image):
        frames[name] = image_to_bgr(image)
    return _cb


try:
    vehicle = spawn_vehicle(world, spawn_index=12, autopilot=True)

    for name, tf in specs.items():
        cam = spawn_camera(world, vehicle, tf, width=res[0], height=res[1])
        cameras.append(cam)
        frames[name] = np.zeros((res[1], res[0], 3), dtype=np.uint8)
        cam.listen(make_cb(name))
        cv2.namedWindow("Ex1-" + name, cv2.WINDOW_AUTOSIZE)

    while True:
        for name in specs.keys():
            cv2.imshow("Ex1-" + name, frames[name])
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
        move_spectator_to(vehicle.get_transform(), spectator)
        world.tick()
        time.sleep(0.02)
finally:
    for cam in cameras:
        cam.stop()
    cv2.destroyAllWindows()
    safe_destroy(cameras + [vehicle])


### Exercise 2: Auto headlights from image brightness
Docs: https://carla.readthedocs.io/en/latest/python_api/#carlavehiclelightstate

#### Solution


In [None]:
vehicle = None
camera = None
state = {"brightness": 255.0, "lights_on": False}
turn_on = 70.0
turn_off = 95.0

try:
    vehicle = spawn_vehicle(world, spawn_index=18, autopilot=True)
    camera = spawn_camera(
        world,
        vehicle,
        carla.Transform(carla.Location(x=1.5, z=1.5), carla.Rotation(pitch=-5.0)),
        width=640,
        height=360,
        tick=0.1,
    )

    def on_image(image):
        gray = cv2.cvtColor(image_to_bgr(image), cv2.COLOR_BGR2GRAY)
        state["brightness"] = float(np.mean(gray))

    camera.listen(on_image)
    last_print = 0.0

    while True:
        b = state["brightness"]
        if b < turn_on and not state["lights_on"]:
            mask = carla.VehicleLightState.Position | carla.VehicleLightState.LowBeam
            vehicle.set_light_state(carla.VehicleLightState(mask))
            state["lights_on"] = True
        elif b > turn_off and state["lights_on"]:
            vehicle.set_light_state(carla.VehicleLightState.NONE)
            state["lights_on"] = False

        if time.time() - last_print > 1.0:
            print("brightness={:.1f} lights_on={}".format(b, state["lights_on"]))
            last_print = time.time()

        move_spectator_to(vehicle.get_transform(), spectator, distance=8.5)
        world.tick()
        time.sleep(0.03)
except KeyboardInterrupt:
    pass
finally:
    if camera is not None:
        camera.stop()
    safe_destroy([camera, vehicle])


### Exercise 3: GPS Pacman portals
Docs: https://carla.readthedocs.io/en/latest/ref_sensors/#gnss-sensor

#### Solution


In [None]:
vehicle = None
gnss = None
state = {"lat": None, "lon": None}
X_MIN = -110.0
X_MAX = 110.0

try:
    vehicle = spawn_vehicle(world, spawn_index=4, autopilot=False)
    gnss_bp = world.get_blueprint_library().find("sensor.other.gnss")
    gnss = world.spawn_actor(gnss_bp, carla.Transform(carla.Location(x=0.5, z=1.8)), attach_to=vehicle)

    def on_gnss(event):
        state["lat"] = event.latitude
        state["lon"] = event.longitude

    gnss.listen(on_gnss)

    ctrl = carla.VehicleControl(throttle=0.45, steer=0.0, brake=0.0)
    last_print = 0.0

    while True:
        vehicle.apply_control(ctrl)
        tf = vehicle.get_transform()
        loc = tf.location

        if loc.x > X_MAX:
            loc2 = carla.Location(X_MIN, loc.y, loc.z + 0.2)
            vehicle.set_transform(carla.Transform(loc2, tf.rotation))
            world.debug.draw_string(loc2 + carla.Location(z=1.5), "PORTAL!", life_time=2.0, color=carla.Color(255, 200, 0))

        if time.time() - last_print > 1.0 and state["lat"] is not None:
            print("lat={:.6f} lon={:.6f} x={:.2f}".format(state["lat"], state["lon"], loc.x))
            last_print = time.time()

        move_spectator_to(vehicle.get_transform(), spectator, distance=8.0, z=3.0)
        world.tick()
        time.sleep(0.03)
except KeyboardInterrupt:
    pass
finally:
    if gnss is not None:
        gnss.stop()
    safe_destroy([gnss, vehicle])


### Optional Exercise 4: LIDAR safety bubble
Docs: https://carla.readthedocs.io/en/latest/ref_sensors/#lidar-sensor

#### Solution


In [None]:
vehicle = None
target = None
lidar = None
state = {"d_front": None}

try:
    vehicle = spawn_vehicle(world, spawn_index=25, autopilot=False)
    target = spawn_vehicle_ahead(world, vehicle, distances=(24.0, 30.0, 36.0))
    target.apply_control(carla.VehicleControl(throttle=0.0, brake=1.0, hand_brake=True))

    lidar = spawn_lidar(world, vehicle, carla.Transform(carla.Location(x=1.8, z=2.2)))

    def on_lidar(measurement):
        pts = np.frombuffer(measurement.raw_data, dtype=np.float32)
        pts = np.reshape(pts, (-1, 4))
        x = pts[:, 0]
        y = pts[:, 1]
        m = (x > 0.0) & (x < 20.0) & (np.abs(y) < 2.5)
        if np.any(m):
            d = np.sqrt(x[m] ** 2 + y[m] ** 2)
            state["d_front"] = float(np.min(d))
        else:
            state["d_front"] = None

    lidar.listen(on_lidar)

    while True:
        d = state["d_front"]
        ctrl = carla.VehicleControl()
        if d is not None and d < 6.0:
            ctrl.throttle = 0.0
            ctrl.brake = 1.0
        elif d is not None and d < 10.0:
            ctrl.throttle = 0.15
            ctrl.brake = 0.25
        else:
            ctrl.throttle = 0.35
            ctrl.brake = 0.0
        vehicle.apply_control(ctrl)

        if d is not None:
            world.debug.draw_string(
                vehicle.get_transform().location + carla.Location(z=2.2),
                "d_front={:.2f} m".format(d),
                life_time=0.08,
                color=carla.Color(255, 255, 0),
            )

        move_spectator_to(vehicle.get_transform(), spectator)
        world.tick()
        time.sleep(0.03)
except KeyboardInterrupt:
    pass
finally:
    if lidar is not None:
        lidar.stop()
    safe_destroy([lidar, target, vehicle])
