In [49]:
import sys
import os
BASE_DIR = os.path.dirname(os.path.abspath("__file__"))
sys.path.append(os.path.join(BASE_DIR, '../'))

In [50]:
import carla
import numpy as np
import cv2
import time

from IPython.display import clear_output, Image
import ipywidgets as widgets
import threading
import math
import logging
import random

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

In [51]:
from inference.predict_steering import predict_steering_angle

In [52]:
host_ip_address = "192.168.1.71"
img_widget = widgets.Image(format='png')


# Add a global variables
simulation_running = False
camera = None
vehicle = None


In [53]:
# vehicle speed
speed_control_active = False
target_speed = 8 # Desired speed in km/h
inference_started = False

In [54]:
def process_image(image):
    img_array = np.array(image.raw_data)
    img_rgb = img_array.reshape((image.height, image.width, 4))
    img_rgb = img_rgb[:, :, :3]
    img_bgr = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2BGR)
    img_jpeg = cv2.imencode('.jpeg', img_bgr)[1].tobytes()
    return img_jpeg

def display_image(image_data):
    clear_output(wait=True)
    display(Image(image_data))

def image_to_numpy(carla_image):
    array = np.frombuffer(carla_image.raw_data, dtype=np.dtype("uint8"))
    array = np.reshape(array, (carla_image.height, carla_image.width, 4))
    array = array[:, :, :3]
    array = array[:, :, ::-1]
    return array


In [55]:
def run_camera():
    global simulation_running
    try:
        while simulation_running:
            time.sleep(1)
    except KeyboardInterrupt:
        pass
    finally:
        camera.stop()
        vehicle.destroy()

def clip_angle(angle):
    return max(-1.0, min(1.0, angle))


def control_vehicle_steering(vehicle, steering_angle):
    control = vehicle.get_control()
    control.steer = clip_angle(steering_angle / 180.0)  # Convert the angle to the -1 to 1 range expected by Carla
    vehicle.apply_control(control)


def on_new_image(image):
    global img_widget, vehicle, inference_started
    image_data = process_image(image)
    img_widget.value = image_data

    if inference_started:
        # Get the image as a numpy array
        image_np = image_to_numpy(image)

        # Predict the steering angle using the loaded model
        steering_angle = predict_steering_angle(image_np)
        logger.info("Predicted steering angle: {}".format(steering_angle))

        # Set the steering control for the vehicle
        control_vehicle_steering(vehicle, steering_angle)

    
def print_available_worlds():
    remote_host_ip = host_ip_address
    remote_host_port = 2000
    
    client = carla.Client(remote_host_ip, remote_host_port)
    client.set_timeout(10.0)

    print("Available maps:")
    for map_name in client.get_available_maps():
        print(map_name)
        
    
def start_simulation():
    global img_widget, camera, vehicle
    
    remote_host_ip = host_ip_address
    remote_host_port = 2000

    client = carla.Client(remote_host_ip, remote_host_port)
    client.set_timeout(10.0)

     # Load layered map for Town 01 with minimum layout plus buildings and parked vehicles
    world = client.load_world('Town10HD')

    blueprint_library = world.get_blueprint_library()

    # Spawn a vehicle
    vehicle_bp = blueprint_library.find('vehicle.tesla.model3')
    spawn_points = world.get_map().get_spawn_points()
    vehicle = world.spawn_actor(vehicle_bp, random.choice(spawn_points))

    # Attach a camera sensor to the vehicle
    camera_bp = blueprint_library.find('sensor.camera.rgb')
    camera_bp.set_attribute('image_size_x', '800')
    camera_bp.set_attribute('image_size_y', '600')
    camera_bp.set_attribute('fov', '110')

    camera_transform = carla.Transform(carla.Location(x=2.5, z=1.2))
    camera = world.spawn_actor(camera_bp, camera_transform, attach_to=vehicle)

    camera.listen(on_new_image)

    camera_thread = threading.Thread(target=run_camera, daemon=True)
    camera_thread.start()


In [56]:
def accelerate(vehicle, throttle=0.5, duration=1):
    control = vehicle.get_control()
    control.throttle = throttle
    vehicle.apply_control(control)
    time.sleep(duration)
    control.throttle = 0
    vehicle.apply_control(control)

def brake(vehicle, brake_strength=0.5, duration=1):
    control = vehicle.get_control()
    control.brake = brake_strength
    vehicle.apply_control(control)
    time.sleep(duration)
    control.brake = 0
    vehicle.apply_control(control)

def steer(vehicle, steering_angle, duration=1):
    control = vehicle.get_control()
    control.steer = steering_angle
    vehicle.apply_control(control)
    time.sleep(duration)
    control.steer = 0
    vehicle.apply_control(control)

def reverse(vehicle, throttle=-0.5, duration=1):
    control = vehicle.get_control()
    control.throttle = throttle
    vehicle.apply_control(control)
    time.sleep(duration)
    control.throttle = 0
    vehicle.apply_control(control)


In [57]:
def maintain_speed(vehicle, target_speed):
    current_speed = get_speed(vehicle)
    throttle = 0.0

    if current_speed < target_speed:
        throttle = min((target_speed - current_speed) * 0.05, 1.0)
    elif current_speed > target_speed:
        throttle = max((target_speed - current_speed) * 0.05, -1.0)

    vehicle_control = vehicle.get_control()
    vehicle_control.throttle = throttle
    vehicle.apply_control(vehicle_control)

def get_speed(vehicle):
    velocity = vehicle.get_velocity()
    return 3.6 * math.sqrt(velocity.x**2 + velocity.y**2 + velocity.z**2)

def stop_vehicle(vehicle):
    vehicle_control = vehicle.get_control()
    vehicle_control.throttle = 0.0
    vehicle_control.brake = 0.0
    vehicle.apply_control(vehicle_control)

def maintain_speed_thread(vehicle, target_speed):
    global speed_control_active, should_stop_car
    while speed_control_active:
        maintain_speed(vehicle, target_speed)
        logger.info("Current speed: {:.2f} km/h".format(get_speed(vehicle)))
        time.sleep(0.1)

def on_maintain_speed_click(button):
    global speed_control_active, should_stop_car, vehicle, target_speed

    if not speed_control_active:
        button.description = "Stop Maintaining Speed"
        speed_control_active = True
        time.sleep(0.2)
        speed_thread = threading.Thread(target=maintain_speed_thread, args=(vehicle, target_speed), daemon=True)
        speed_thread.start()
    else:
        button.description = "Maintain Speed"
        speed_control_active = False
        stop_vehicle(vehicle)


In [58]:
accelerate_button = widgets.Button(description="Accelerate")
brake_button = widgets.Button(description="Brake")
steer_left_button = widgets.Button(description="Steer Left")
steer_right_button = widgets.Button(description="Steer Right")
reverse_button = widgets.Button(description="Reverse")
maintain_speed_button = widgets.Button(description="Maintain Speed")
start_inference_button = widgets.Button(description="Start Inference")

def on_accelerate_click(button):
    threading.Thread(target=accelerate, args=(vehicle,), daemon=True).start()

def on_brake_click(button):
    threading.Thread(target=brake, args=(vehicle,), daemon=True).start()

def on_steer_left_click(button):
    threading.Thread(target=steer, args=(vehicle, -0.5), daemon=True).start()

def on_steer_right_click(button):
    threading.Thread(target=steer, args=(vehicle, 0.5), daemon=True).start()

def on_reverse_click(button):
    threading.Thread(target=reverse, args=(vehicle,), daemon=True).start()

def on_start_inference_button_click(button):
    global inference_started

    if not inference_started:
        button.description = "Stop Inference"
        inference_started = True
    else:
        button.description = "Start Inference"
        inference_started = False

accelerate_button.on_click(on_accelerate_click)
brake_button.on_click(on_brake_click)
steer_left_button.on_click(on_steer_left_click)
steer_right_button.on_click(on_steer_right_click)
reverse_button.on_click(on_reverse_click)
maintain_speed_button.on_click(on_maintain_speed_click)
start_inference_button.on_click(on_start_inference_button_click)

controls_box = widgets.HBox([
    accelerate_button,
    brake_button,
    steer_left_button,
    steer_right_button,
    reverse_button,
    maintain_speed_button,
    start_inference_button
])
display(controls_box)


HBox(children=(Button(description='Accelerate', style=ButtonStyle()), Button(description='Brake', style=Button…

In [59]:
display(img_widget)

Image(value=b'')

In [60]:
def on_button_click(button):
    global simulation_running
    if not simulation_running:
        button.description = "Stop Simulation"
        simulation_running = True
        start_simulation()
    else:
        button.description = "Start Simulation"
        simulation_running = False
        

stop_button = widgets.Button(description="Start Simulation" if not simulation_running else "Stop Simulation")
stop_button.on_click(on_button_click)
display(stop_button)


Button(description='Start Simulation', style=ButtonStyle())