In [1]:
import numpy as np
import math
import random
import carla
import time

In [2]:
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.load_world('Town03')

In [3]:
blueprint_library = world.get_blueprint_library()
vehicle_bp = blueprint_library.filter("vehicle.audi.tt")[0]
starting_point = world.get_map().get_spawn_points()[0]
# print(starting_point)
vehicle = world.try_spawn_actor(vehicle_bp,starting_point )
vehicle_pos = vehicle.get_transform()
lidar_bp = blueprint_library.find('sensor.lidar.ray_cast')
lidar_blueprint = blueprint_library.find("sensor.lidar.ray_cast")
lidar_blueprint.set_attribute("channels", str(32))
lidar_blueprint.set_attribute("points_per_second", str(100000))
lidar_blueprint.set_attribute("rotation_frequency", str(10.0))
lidar_blueprint.set_attribute("upper_fov", str(30.0))
lidar_blueprint.set_attribute("lower_fov", str(-25.0))
lidar_blueprint.set_attribute("range", str(100.0))
spectator_rotation = vehicle_pos.rotation
lidar_transform = carla.Transform(starting_point.location, spectator_rotation)
lidar_sensor = world.spawn_actor(lidar_blueprint, lidar_transform, attach_to = vehicle)

In [4]:
def get_steering_angle(vehicle_location, target_location, vehicle_transform):
            dx = target_location[0] - vehicle_location.location.x
            dy = target_location[1] - vehicle_location.location.y
            target_angle = math.atan2(dy, dx)
            vehicle_heading = math.radians(vehicle_transform.rotation.yaw)
            angle_diff = target_angle - vehicle_heading
            while angle_diff > math.pi:
                angle_diff -= 2 * math.pi
            while angle_diff < -math.pi:
                angle_diff += 2 * math.pi
            return angle_diff
    

In [None]:
# Node class representing a state in the space
class Node:
    def __init__(self, x, y):
        self.x = x
        self.y = y
        self.parent = None
        self.cost = 0

# RRT* algorithm
class RRTStar:
    def __init__(self, start, goal,map_size, carla_world, vehicle,step_size=1.0, max_iter=500):
        self.start = Node(start[0],start[1])
        self.goal = Node(goal[0], goal[1])
        self.map_size = map_size
        self.step_size = step_size
        self.max_iter = max_iter
        self.node_list = [self.start]
        self.goal_region_radius = 1.5
        self.search_radius = 2.0
        self.path = None
        self.goal_reached = False
        self.carla_world = carla_world
        self.vehicle = vehicle
        self.lidar_data = []
        
    def get_lidar_data(self):
         lidar_sensor.listen(lambda data: self.process_lidar_data(data))

    def process_lidar_data(self, data):
        self.lidar_data = self.get_lidar_data()
        lidar = []
        for detection in data:
            ox = detection.point.x
            oy = detection.point.y
            size = detection.point.z
            lidar.append((ox, oy, size))
        self.lidar_data = lidar  
        print("Lidar Data Updated:", lidar)

    def get_nearest_node(self, node_list, rand_node):
        distances = [np.linalg.norm([node.x - rand_node.x, node.y - rand_node.y]) for node in node_list]
        nearest_node_idx = np.argmin(distances)
        return node_list[nearest_node_idx]

    def plan(self):
        """Main RRT* planning loop."""
        for i in range(self.max_iter):
            rand_node = self.get_random_node()
            nearest_node = self.get_nearest_node(self.node_list, rand_node)
            new_node = self.steer(nearest_node, rand_node)

            if self.is_collision_free(new_node):
                neighbors = self.find_neighbors(new_node)
                new_node = self.choose_parent(neighbors, nearest_node, new_node)
                self.node_list.append(new_node)
                self.rewire(new_node, neighbors)

            if self.reached_goal(new_node):
                self.path = self.generate_final_path(new_node)
                self.goal_reached = True
                self.move_vehicle_along_path()
                return

    def get_random_node(self):
        if random.random() > 0.2:
            return Node(random.uniform(0, self.map_size[0]), random.uniform(0, self.map_size[1]))
        else:
            return Node(self.goal.x, self.goal.y)
        
    def steer(self, from_node, to_node):
        """Steer from one node to another, step-by-step."""
        theta = math.atan2(to_node.y - from_node.y, to_node.x - from_node.x)
        new_node = Node(from_node.x + self.step_size * math.cos(theta),
                        from_node.y + self.step_size * math.sin(theta))
        new_node.cost = from_node.cost + self.step_size
        new_node.parent = from_node
        return new_node

    def is_collision_free(self, node):
        """Check if the node is collision-free with respect to obstacles."""
        for (ox, oy, size) in self.lidar_data:
            if (node.x - ox) ** 2 + (node.y - oy) ** 2 <= (size ** 2):
                return False
        return True

    def find_neighbors(self, new_node):
        """Find nearby nodes within the search radius."""
        return [node for node in self.node_list
                if np.linalg.norm([node.x - new_node.x, node.y - new_node.y]) < self.search_radius]

    def choose_parent(self, neighbors, nearest_node, new_node):
        """Choose the best parent for the new node based on cost."""
        min_cost = nearest_node.cost + np.linalg.norm([new_node.x - nearest_node.x, new_node.y - nearest_node.y])
        best_node = nearest_node

        for neighbor in neighbors:
            cost = neighbor.cost + np.linalg.norm([new_node.x - neighbor.x, new_node.y - neighbor.y])
            if cost < min_cost and self.is_collision_free(neighbor):
                best_node = neighbor
                min_cost = cost

        new_node.cost = min_cost
        new_node.parent = best_node
        return new_node

    def rewire(self, new_node, neighbors):
        """Rewire the tree by checking if any neighbor should adopt the new node as a parent."""
        for neighbor in neighbors:
            cost = new_node.cost + np.linalg.norm([neighbor.x - new_node.x, neighbor.y - new_node.y])
            if cost < neighbor.cost and self.is_collision_free(neighbor):
                neighbor.parent = new_node
                neighbor.cost = cost

    def reached_goal(self, node):
        """Check if the goal has been reached."""
        return np.linalg.norm([node.x - self.goal.x, node.y - self.goal.y]) < self.goal_region_radius

    def generate_final_path(self, goal_node):
        """Generate the final path from the start to the goal and drive along it."""
        path = []
        node = goal_node
        while node is not None:
            path.append([node.x, node.y])
            node = node.parent
        return path[::-1]  # Reverse path
    
    def move_vehicle_along_path(self):
        if self.path:
            for waypoints in self.path:
                target_location =[waypoints[0], waypoints[1], 0]
                steering_angle = get_steering_angle(starting_point, target_location, vehicle_pos)
                max_steering_angle = math.radians(30)
                steering = max(-1.0, min(1.0, steering_angle / max_steering_angle))
                vehicle.apply_control(carla.VehicleControl(throttle=0.5, brake=0.0, steer=steering))
                time.sleep(0.1) 


In [6]:
def main():  
    try:
        start_location = [starting_point.location.x, starting_point.location.y]
        goal_location = [start_location[0]+ 100 , start_location[1] + 50]
        rrt_star = RRTStar(start=start_location, goal=goal_location, map_size=[100, 100], carla_world=world, vehicle=vehicle)
        rrt_star.plan()
    finally:
        if vehicle is not None:
            vehicle.destroy()
        if lidar_sensor is not None:
            lidar_sensor.destroy()
        print("Simulation ended.")



In [7]:
if __name__ == "__main__":
    main()

Simulation ended.
