## Initial imports

In [1206]:
import carla
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sys
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
from agents.navigation.global_route_planner import GlobalRoutePlanner
from algs import splinepath
from agents.navigation.local_planner import RoadOption
import time

In [1207]:
rg = np.random.default_rng()

## Connect to a running Carla server and get main pointers

Get pointer to Carla and load a world

In [1208]:
HOST = 'localhost'
PORT = 2000
client = carla.Client(HOST, PORT)
client.set_timeout(2.0)

In [1209]:
client.load_world('Town03')  # Town01-Town07 are available

RuntimeError: failed to connect to newly created map

_wp =  Waypoint(Transform(Location(x=215.318863, y=-5.264622, z=0.000000), Rotation(pitch=0.000000, yaw=-179.144196, roll=0.000000)))
car_wp next index in route =  0
Old_route =  [(215.31796264648438, -5.2646355628967285, <RoadOption.LANEFOLLOW: 4>), (210.31851196289062, -5.339315414428711, <RoadOption.LANEFOLLOW: 4>), (205.31907653808594, -5.413994789123535, <RoadOption.LANEFOLLOW: 4>), (200.3196258544922, -5.48867416381835Get pointers to the world and blueprint library (templates to all actors and vehicles).

In [1234]:
world = client.get_world()
bpl = world.get_blueprint_library()
sp = world.get_spectator()  # Get pointer to the spectator, i.e., camera in the server window

It is easy to manipulate the world, e.g., change the wather.

In [1235]:
weather = world.get_weather()
world.set_weather(weather.ClearNoon)

Get information about the spectator, e.g., position and orientation. Experiment by moving around in the world and see how things change.

### Spawn a car in the world

This is an example planning mission (defined for Town03). Positions specified using the carla ```Location``` type.

In [1260]:
start_point = carla.Location(x=243, y=40, z=1)  # Start point
end_point = carla.Location(x=244, y=-167, z=1)  # End point

Find a random type of Audi in the library of available vehicles.

In [1261]:
bp = rg.choice(bpl.filter('vehicle.audi.*'))

Spawn the car at the start point. Go and look for it in the world. It is also a good idea to keep track of created actors in the world so that you can clean up at the end.

In [1262]:
actors = []
car = world.spawn_actor(bp, carla.Transform(start_point))
actors.append(car)

### Plan a mission

Make a plan using the carla provided A* planner (you are welcome to integrate your planner from hand-in 1 if you like but you do not have to). Also create a ```SplinePath ``` object of the resulting plan.

In [1263]:
RESOLUTION = 1
_map = world.get_map()
dao = GlobalRoutePlannerDAO(_map, sampling_resolution=RESOLUTION)
rp = GlobalRoutePlanner(dao)
rp.setup()
route = rp.trace_route(start_point, end_point)

## Create path object using nodes
p = np.array([(ri[0].transform.location.x, ri[0].transform.location.y)
              for ri in route])
path = splinepath.SplinePath(p, min_grid=3)

Move car and spectator to beginning of plan.

In [1264]:
t = route[0][0].transform
car.set_transform(t)
t.location.z += 15  # 15 meters above car
sp.set_transform(t)

## Add a car(obstacle) on the calculated path

In [1265]:
car_obs = world.spawn_actor(bp, carla.Transform(route[15][0].transform.location))
car_obs2 = world.spawn_actor(bp, carla.Transform(route[80][0].transform.location))
actors.append(car_obs)
actors.append(car_obs2)
t = route[15][0].transform
car_obs.set_transform(t)
t = route[70][0].transform
car_obs2.set_transform(t)

## State feedback controller

In [1266]:

class StateFeedbackController:
    def __init__(self, K, L, path=None, goal_tol=1):
        self.plan = path
        self.K = K
        self.goal_tol = goal_tol
        self.d = []
        self.delta = []
        self.theta_e = []
        self.s_p = []
        self.L = L
        self.s0 = 0

        self.t = []
        self.w = []

    def heading_error(self, heading, s):
        """Compute theta error"""
        heading0, nc = self.plan.heading(s)
        cos_alpha = heading.dot(heading0)
        sin_alpha = np.float(np.cross(heading0, heading))

        theta_e = np.arctan2(sin_alpha, cos_alpha)
        return theta_e

    def u(self, t, w):
        def glob_stab_fact(x):
            """Series expansion of sin(x)/x around x=0."""
            return 1 - x**2/6 + x**4/120 - x**6/5040

        a = 0
        x, y, theta, v = w
        self.w.append(w)
        p_car = w[0:2]
        si, d = self.plan.project(p_car, self.s0, ds=2, s_lim=20)
        self.s0 = si

        heading = np.array([np.cos(theta), np.sin(theta)])
        theta_e = self.heading_error(heading, si)

        # No feed-forward term
        u =  - self.K.dot(np.array([d*glob_stab_fact(theta_e), theta_e]))[0]
        delta = np.max((-1.0, np.min((1.0, self.L*u))))
        
        self.d.append(d)
        self.delta.append(delta)
        self.s_p.append(si)
        self.theta_e.append(theta_e)
        self.t.append(t)

        return np.array([delta, a])
    
    def run(self, t, w):
        p_goal = self.plan.path[-1, :]
        p_car = w[0:2]
        dp = p_car - p_goal
        dist = np.sqrt(dp.dot(dp))
        if dist < self.goal_tol:
            return False
        else:
            return True

## Calcualte path

In [1267]:
def calculate_path(route, actors, RADIUS=RESOLUTION, _map=_map):
    def _is_equal_location(loc1, loc2, offset=RADIUS):
        return round(np.abs(loc1.x - loc2.x),0) < offset and round(np.abs(loc1.y - loc2.y),0) < offset
    
    def _is_next_wp(wp1, wp2, offset=RADIUS):
        return round(abs(wp1.s-wp2.s)) < offset
    #Location of all obstacles, should be exchanged with waypoint generated by RPLidar
    obstacles = [] 
    for i in range(1, len(actors)):
        obstacles.append(actors[i].get_transform().location)
    car_wp = _map.get_waypoint(actors[0].get_transform().location, project_to_road=True, lane_type=carla.LaneType.Driving) 
    
    #Look is there is a obstacle in the same lane within the look ahead distance, if there is save the distance 
    i = 1
    found_obs = False
    look_ahead_wp = car_wp
    look_ahead_distance = 25
    while True:
        look_ahead_wp = look_ahead_wp.next(RADIUS)[0]
        for obs in obstacles:
            if _is_equal_location(obs, look_ahead_wp.transform.location):
                found_obs = True
                break
        if found_obs:
            break
        if i == look_ahead_distance:
            break
        i = i + 1
    dist = i
    print("Found obs = ", found_obs)
    print("dist = ", dist)
    #Find index of the waypoint closest to and ahead of car in the current route (-1 if not found).
    index = -1
    for j in range(len(route)):
        loc_route = route[j][0].transform.location #Location at point with index j in current route
        loc_car = car_wp.transform.location #Location of car
        if round(car_wp.s,0) <= round(route[j][0].s,0) and _is_next_wp(car_wp, route[j][0]):
            index = j
            break
    #If the car is not on the path then there is no remaning path
    if index == -1:
        return {"route": [], "path": []}
    #The remaining part of the route is the new route (index+1 to include the waypoint at index)
    temp_route = route[index+1:]  
    
    #If an obstacle is found then modify the current route     
    if found_obs: 
        #Turn left (to start with assume left lane is available and free)
        temp_route[index] = (temp_route[index][0], RoadOption.CHANGELANELEFT)
        #Get the corresponding left lane waypoint (~90 deg left from car)
        left_wp = temp_route[index][0].get_left_lane().next(RADIUS)[0]
        #Drive in the left lane two times the distance to the target.
        for k in range(index + 1, index + dist*2):
            left_wp = left_wp.next(RADIUS)[0]
            temp_route[k] = (left_wp, RoadOption.LANEFOLLOW)
        #Change to the original lane
        temp_route[index+dist*2] = (left_wp, RoadOption.CHANGELANERIGHT)
        #When car is back in the original lane temporaly remove the remaining part of the route.
        del temp_route[index+dist*2+1:]
        #Index in the route for the closest ahead waypoint after the overtaking is done
        index_after = -1
        for j in range(len(route)):
            wp_route = route[j][0]
            wp_car = left_wp.get_right_lane().next(RADIUS)[0].next(RADIUS)[0]
            if round(wp_car.s,0) <= round(wp_route.s,0) and _is_next_wp(wp_car, wp_route) and wp_car.lane_id == wp_route.lane_id:
                index_after = j
                break
        #If there is an remaining route after the overtake, then extend the new route with the remaining part from there of the current route
        if index_after != -1:
            temp_route.extend(route[index_after:])
        
    #Calculate new spline path from new route 
    temp_p = np.array([(ri[0].transform.location.x, ri[0].transform.location.y)
              for ri in temp_route])
    temp_path = splinepath.SplinePath(temp_p, min_grid=3)
    
    return {"path": temp_path, "route": temp_route}

# Run car

## Initial control

In [1268]:
init_transform = route[0][0].transform # Set car in initial position of the plan
car.set_transform(init_transform)
car.apply_control(carla.VehicleControl(throttle=0, steer=0))


## Control along path

In [1269]:
K = np.array([0.1, 0.25]).reshape((1, 2))
L = 3.5
ctrl = StateFeedbackController(K, L, path)
car_states = []
tl = []
i = 0
while True:
    
    if i % 250 == 0:
        print("planning path")
        #car.apply_control(carla.VehicleControl(throttle=0, steer=0))
        plan = calculate_path(route=route, actors=actors)
        route = plan["route"]
        path = plan["path"]
        ctrl = StateFeedbackController(K, L, path)
        T = 2  # Time before line dissapears, negative for never
        s = np.linspace(0, path.length, 500)
        for s1, s2 in zip(s[:-1], s[1:]):
            s1_loc = carla.Location(x=float(path.x(s1)), y=float(path.y(s1)), z=0.5)
            s2_loc = carla.Location(x=float(path.x(s2)), y=float(path.y(s2)), z=0.5)
            world.debug.draw_line(s1_loc, s2_loc, thickness=0.35, 
                                  life_time=T, color=carla.Color(b=255))
        
    if ctrl.s0 > path.length-5:
        break
    
    tck = world.wait_for_tick(1)
    t = car.get_transform()
    v = car.get_velocity()
    v = np.sqrt(v.x**2+v.y**2+v.z**2)
    w = np.array([t.location.x, t.location.y, t.rotation.yaw*np.pi/180.0, v])
    car_states.append(w)

    # Compute control signal and apply to car
    u = ctrl.u(tck.timestamp.elapsed_seconds, w)
    car.apply_control(carla.VehicleControl(throttle=0.4, steer=u[0]))
    
    i = i + 1

planning path
Found obs =  True
dist =  15
planning path
Found obs =  False
dist =  25
planning path
Found obs =  False
dist =  25
planning path
Found obs =  True
dist =  21
planning path
Found obs =  True
dist =  4


KeyboardInterrupt: 

## Stop car

## Cleanup

If you have kept pointers to all created actors, cleanup is simple

In [1259]:
print(actors)
for a in actors:
    a.destroy()
actors = []

[]


In [1204]:
car_obs.destroy()

False

In [1205]:
car.destroy()

False