In [1]:
# Imports

import carla
import math
import random
import time
import cv2
import sys
import numpy as np
import threading
#import tensorflow_addons as tfa
#distributions = tf.compat.v1.distributions  # or find the corresponding TensorFlow 2.x distribution module




In [2]:
# Setup

client = carla.Client('localhost', 2000)
world = client.get_world()

bp_lib = world.get_blueprint_library()
spawn_points = world.get_map().get_spawn_points()

car_bp = bp_lib.find('vehicle.lincoln.mkz_2020')
truck_bp = bp_lib.find('vehicle.carlamotors.firetruck')

spectator = world.get_spectator()

town_map = world.get_map()
print(town_map)


for actor in world.get_actors().filter('vehicle.*'):
    actor.destroy()


# Coordinates
# Initial and destination coordinates for both the car and the truck are defined.
x_car = 78.78953552246094
y_car = 218.3877716064453 
z_car = 2.189335823059082

dest_x_car = 238.78953552246094
dest_y_car = 177.3877716064453 
dest_z_car = z_car

x_truck = 109
y_truck = 160.6210479736328
z_truck = 1.6630632877349854

dest_x_truck = 259
dest_y_truck = y_truck
dest_z_truck = z_truck


# The closest spawn points for the car, truck, and their destinations are determined based on the coordinates.
start_point_car = min(spawn_points, key=lambda spawn_point: spawn_point.location.distance(carla.Location(x_car, y_car, z_car)))
dest_point_car = min(spawn_points, key=lambda spawn_point: spawn_point.location.distance(carla.Location(dest_x_car, dest_y_car, dest_z_car)))

start_point_truck = min(spawn_points, key=lambda spawn_point: spawn_point.location.distance(carla.Location(x_truck, y_truck, z_truck)))
dest_point_truck = min(spawn_points, key=lambda spawn_point: spawn_point.location.distance(carla.Location(dest_x_truck, dest_y_truck, dest_z_truck)))

# Spawn the vehicle at the closest spawn point
car = world.try_spawn_actor(car_bp, start_point_car)
truck = world.try_spawn_actor(truck_bp, start_point_truck)





Map(name=Carla/Maps/Town06)


In [3]:

pref_speed = 50
speed_threshold = 2

def maintain_speed(s):
    if s >= pref_speed:
        return 0
    elif s < pref_speed - speed_threshold:
        return 0.8
    else:
        return 0.4

In [4]:

def angle_between(v1, v2):
    return math.degrees(np.arctan2(v1[1], v1[0]) - np.arctan2(v2[1], v2[0]))

def get_angle(car, wp):
    car_pos = car.get_transform()
    car_x = car_pos.location.x
    car_y = car_pos.location.y
    wp_x = wp.transform.location.x
    wp_y = wp.transform.location.y

    x = (wp_x - car_x) / ((wp_y - car_y)**2 + (wp_x - car_x)**2)**0.5
    y = (wp_y - car_y) / ((wp_y - car_y)**2 + (wp_x - car_x)**2)**0.5

    car_vector = car_pos.get_forward_vector()
    degrees = angle_between((x,y), (car_vector.x, car_vector.y))

    return degrees

In [5]:
# Test the angle function
# print(get_angle(car, route_car[0][0]))

In [6]:
def calculate_distance(actor1, actor2):
    location1 = actor1.get_location()
    location2 = actor2.get_location()
    distance = location1.distance(location2)
    return distance


In [7]:
# Route planner

def plan_route(world, start_point, dest_x, dest_y, dest_z, sampling_resolution=1, debug_draw=False):
    
    # This following line needs to be changed according to YOUR installation path
    carla_path = r'Z:\Documents\WindowsNoEditor\PythonAPI\carla'
    sys.path.append(carla_path)
    from agents.navigation.global_route_planner import GlobalRoutePlanner

    point_a = start_point.location
    point_b = carla.Location(x=dest_x, y=dest_y, z=dest_z)

    grp = GlobalRoutePlanner(world.get_map(), sampling_resolution)

    distance = 1000

    for loc in spawn_points:
        cur_route = grp.trace_route(point_a, point_b)
        if len(cur_route) < distance:
            distance = len(cur_route)
            route = cur_route

    if debug_draw:
        for waypoint in route:
            world.debug.draw_string(
                waypoint[0].transform.location,
                '^',
                draw_shadow=False,
                color=carla.Color(r=0, g=0, b=255),
                life_time=60.0,
                persistent_lines=True
            )

    return route

# route_truck = plan_route(world, start_point_truck, dest_x_truck, dest_y_truck, dest_z_truck, sampling_resolution=1, debug_draw=True)
# route_car = plan_route(world, start_point_car, dest_x_car, dest_y_car, dest_z_car, sampling_resolution=1, debug_draw=True)

In [8]:
def drive_vehicle(vehicle, route, other_vehicle):
    curr_wp = 0
    predicted_angle = get_angle(vehicle, route[0][0])

    while curr_wp < len(route) - 1:
        world.tick()

        # Additional conditions or user input for stopping the simulation
        # Calculate distance between the vehicles
        distance_to_other_vehicle = calculate_distance(vehicle, other_vehicle)
        print(distance_to_other_vehicle)

        # Check if the distance is below a threshold
        if distance_to_other_vehicle < 20:
            vehicle.apply_control(carla.VehicleControl(throttle=0, steer=0, brake=1))
            other_vehicle.apply_control(carla.VehicleControl(throttle=0, steer=0, brake=1))
            return


        while curr_wp < len(route) and vehicle.get_transform().location.distance(route[curr_wp][0].transform.location) < 5:
            curr_wp += 1

        predicted_angle = get_angle(vehicle, route[curr_wp][0])
        v = vehicle.get_velocity()
        speed = round(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2), 0)
        estimated_throttle = maintain_speed(speed)

        if predicted_angle < - 300:
            predicted_angle = predicted_angle + 360
        elif predicted_angle > 300:
            predicted_angle = predicted_angle - 360
        steer_input = predicted_angle

        if predicted_angle < -40:
            steer_input = -40
        elif predicted_angle > 40:
            steer_input = 40
        
        steer_input = steer_input / 75

        vehicle.apply_control(carla.VehicleControl(throttle=estimated_throttle, steer=steer_input))

    vehicle.apply_control(carla.VehicleControl(throttle=0, steer=0, brake=1))


In [9]:
# Test for running

route_truck = plan_route(world, start_point_truck, dest_x_truck, dest_y_truck, dest_z_truck, sampling_resolution=1, debug_draw=True)
route_car = plan_route(world, start_point_car, dest_x_car, dest_y_car, dest_z_car, sampling_resolution=1, debug_draw=True)

car_thread = threading.Thread(target=drive_vehicle, args=(car, route_car, truck))
truck_thread = threading.Thread(target=drive_vehicle, args=(truck, route_truck, car))

# Start the threads
car_thread.start()
truck_thread.start()

# Wait for both threads to finish
car_thread.join()
truck_thread.join()

83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.16382598876953
83.1638259