In [18]:
#all imports
import carla #the sim library itself
import random #to pick random spawn point
import cv2 #to work with images from cameras
import numpy as np #in this example to change image representation - re-shaping
import keyboard # to interrupt while loops
import os

# for math and calculating time and shit
import time
import math
import random


# To import a behavior agents for controlling car
from agents.navigation.basic_agent import BasicAgent
from agents.navigation.behavior_agent import BehaviorAgent
# import prediction model
import importlib
import predict_control
importlib.reload(predict_control)
from predict_control import predict

#ui imports
import tkinter as tk


In [19]:
# connect to the sim, create the world
client = carla.Client('localhost', 2000)

# the town we've been using is town04, feel free to load different towns, goes to 012 i believe
client.load_world('Town04')

#define environment/world and get possible places to spawn a car
world = client.get_world()


spawn_points = world.get_map().get_spawn_points()

# function for clearing da world
def clear_world():
    # clears all vehicles + sensors and sets them to None
    for actor in world.get_actors().filter('*vehicle*'):
        actor.destroy()
        actor = None
    for sensor in world.get_actors().filter('*sensor*'):
        sensor.destroy()
        sensor = None
        

In [23]:
# vehicle setup
class Vehicle:
    def __init__(self, world, vehicle_id, vehicle_name, collision_callback, **callback_kwargs):
        
        # intialize reference world + diff spawn points + list of diff waypoints
        self.world = world
        self.spawn_points = world.get_map().get_spawn_points()
        self.waypoint_list = world.get_map().generate_waypoints(distance=2.0)
        self.current_waypoint = None # current destination of car
        self.using_spawnpoints = True
        self.spawned_at = 0
        
        # vehicle variables
        self.vehicle_id = vehicle_id
        self.vehicle = None
        self.agent = None
        self.collision_sensor = None
        self.collision_callback = collision_callback
        self.callback_kwargs = callback_kwargs # arguments for callback to customize respawn characteristics
        self.route = None
        self.on_autopilot = False
        
        # picture capturing variables
        self.camera_sem = None
        self.camera_rgb = None
        self.camera_data = None
        
        # save directories for each vehicle, make one if they don't exist
        rgb_directory = f'out_sem/{vehicle_name}/rgb'
        sem_directory = f'out_sem/{vehicle_name}/sem'
        
        if not os.path.exists(rgb_directory):
            os.makedirs(rgb_directory)
            
        if not os.path.exists(sem_directory):
            os.makedirs(sem_directory)
        
        self.camera_rgb_save_path = rgb_directory
        self.camera_sem_save_path = sem_directory
        
        # state constants for intervals between pictures
        self.interval = 7.5 # set interval to 7.5 seconds
        self.sharp_steer_interval = 0.1
        self.light_steer_interval = 0.75
        self.last_capture_time = time.time() # track last time pic was taken
        
    
    def spawn(self, blueprint=None, spawn_point=0):
        print("spawning vehicle: ", self.vehicle_id)
        # default blueprint is model 3
        if blueprint is None:
            blueprint = self.world.get_blueprint_library().filter('*model3*')[0] # it's an array so just return the first model
        
        # spawn the vehicle
        start_point = self.spawn_points[spawn_point]
        self.spawned_at = spawn_point
        try:
            print(start_point)
            self.vehicle = self.world.spawn_actor(blueprint, start_point)
        except Exception as e:
            # try spawning at a diff spawn point
            spawn_point += 1
            print(f"Failed to spawn vehicle: {e}, attempting to spawn at point{spawn_point}")
            start_point = self.spawn_points[spawn_point]
            self.vehicle = self.world.spawn_actor(blueprint, start_point)
                
        # move simulator view to the car
#         spectator = world.get_spectator()
#         start_point.location.z = start_point.location.z+1 #start_point was used to spawn the car but we move 1m up to avoid being on the floor
#         spectator.set_transform(start_point)
        
        
            
        
        # intialize steering
        control = carla.VehicleControl(throttle=0.0, steer=0.0, brake=0.0)
        self.vehicle.apply_control(control)
        #lights always on
        self.vehicle.set_light_state(carla.VehicleLightState.LowBeam)

        # setup the rest of the vehicle
        self.setup_collision_sensor()
        self.setup_cameras() 
        self.setup_agent()
        
    def setup_collision_sensor(self):
        # get collision sensor blueprint
        collision_bp = self.world.get_blueprint_library().find('sensor.other.collision')

        # Set the transform relative to the vehicle
        transform = carla.Transform(carla.Location(x=1.5, z=2.4))  # Front of the vehicle

        # Spawn the collision sensor and attach it to the vehicle
        self.collision_sensor = self.world.spawn_actor(collision_bp, transform, attach_to=self.vehicle)

        # Listen for collision events
        self.collision_sensor.listen(lambda event: self.collision_callback(event, self, self.vehicle_id, **self.callback_kwargs))  
        
    def setup_cameras(self):
        blueprint_library = self.world.get_blueprint_library()

        # Semantic camera setup
        camera_bp = blueprint_library.find('sensor.camera.semantic_segmentation')
        camera_bp.set_attribute('image_size_x', '640')
        camera_bp.set_attribute('image_size_y', '480')
        camera_bp.set_attribute('fov', '90')
        
        self.camera_sem = world.spawn_actor(camera_bp,carla.Transform(carla.Location(z=1.3, x=1.4)), attach_to=self.vehicle)

        # RGB camera setup
        camera_bp = blueprint_library.find('sensor.camera.rgb')
        camera_bp.set_attribute('image_size_x', '640')
        camera_bp.set_attribute('image_size_y', '480')
        camera_bp.set_attribute('fov', '90')
        
        self.camera_rgb = world.spawn_actor(camera_bp, carla.Transform(carla.Location(z=1.3, x=1.4)), attach_to=self.vehicle)

        # Callbacks for camera data
        def sem_callback(image, data_dict):
            image.convert(carla.ColorConverter.CityScapesPalette)
            data_dict['sem_image'] = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))

        def rgb_callback(image, data_dict):
            data_dict['rgb_image'] = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))

        self.camera_data = {'sem_image': np.zeros((480, 640, 4)), 'rgb_image': np.zeros((480, 640, 4))}

        self.camera_sem.listen(lambda image: sem_callback(image, self.camera_data))
        self.camera_rgb.listen(lambda image: rgb_callback(image, self.camera_data))

    def setup_agent(self):

#         # start a basic agent for the car
#         self.agent = BasicAgent(self.vehicle)

        # give it a normal profile
        self.agent = BehaviorAgent(self.vehicle, behavior='normal') # maybe set to aggressive?

#         # usually will just be overwritten just a placeholder location (maybe put self.waypoint here idk)
#         self.current_waypoint = self.spawn_points[3].location # for now we just take the second spawn point
#                                       # later use a CARLA location at self.x + 10 or something
            
#         currLocation = self.vehicle.get_transform().location
#         distance = self.distance(self.current_waypoint, currLocation)
#         print(distance)
        
#         self.agent.set_destination(self.current_waypoint)
#         self.find_next_waypoint()
        
    def destroy(self):
        
        print("destroying vehicle: ", self.vehicle_id)
        if self.collision_sensor is not None and self.collision_sensor.is_alive:
            self.collision_sensor.stop() # idk if this function exists
            self.collision_sensor.destroy()
            self.collision_sensor = None
            
        if self.vehicle is not None and self.vehicle.is_alive:
            self.vehicle.destroy()
            self.vehicle = None
        
        if self.camera_sem is not None and self.camera_sem.is_alive:
            self.camera_sem.destroy()
            self.camera_sem = None
            
        if self.camera_rgb is not None and self.camera_rgb.is_alive:
            self.camera_rgb.destroy()
            self.camera_rgb = None
        
        if self.agent:
            self.agent = None
            
            
            
    # helper functions for running:
    
    
    def print_vehicle_controls(self):
        control = self.vehicle.get_control()
        normalized_steer = (control.steer + 1) /2
        normalized_throttle = (control.throttle)
        normalized_brake = (control.brake)
        print("Steer: {:.3f}, Throttle: {:.3f}, Brake: {:.3f}".format(normalized_steer, normalized_throttle, normalized_brake))

    def grab_and_display_image(self):
        #     print_vehicle_controls(vehicle)

        rgb_im = self.camera_data['rgb_image']
        sem_im = self.camera_data['sem_image']

#         # display images
        im_h = cv2.hconcat([rgb_im,sem_im])
        cv2.imshow('2 cameras', im_h)

        return rgb_im, sem_im

    def grab_and_display_sem(self):
        #     print_vehicle_controls(vehicle)

        sem_im = self.camera_data['sem_image']

#         # display images
#         im_h = cv2.hconcat([rgb_im,sem_im])
#         cv2.imshow('2 cameras', im_h)

        return sem_im
    def print_image_to_folder(self, control):
        rgb_im, sem_im = self.grab_and_display_image()
        # grab the time
        time_grab = time.time_ns()
        # normalize controls to between 0 and 100, with 50 being straight or nothing
        norm_steer = (control.steer + 1) /2
        norm_throttle = control.throttle
        norm_brake = control.brake
        
        # make image paths
        image_name = f'{time_grab:06d}_{norm_steer:.3f}_{norm_throttle:.3f}_{norm_brake:.3f}.png'
#         rgb_path = os.path.join(self.camera_rgb_save_path, image_name)
#         sem_path = os.path.join(self.camera_sem_save_path, image_name)
        rgb_path = f'{self.camera_rgb_save_path}/{image_name}'
        sem_path = f'{self.camera_sem_save_path}/{image_name}'
#         cv2.imwrite('out_sem/rgb/%06d_%.3f_%.3f_%.3f.png' % (time_grab, norm_steer, norm_throttle, norm_brake), rgb_im)
#         cv2.imwrite('out_sem/sem/%06d_%.3f_%.3f_%.3f.png' % (time_grab, norm_steer, norm_throttle, norm_brake), sem_im)
        cv2.imwrite(rgb_path, rgb_im)
        cv2.imwrite(sem_path, sem_im)
    
    def distance(self, location1, location2):
        return math.sqrt((location1.x - location2.x)**2 + (location1.y - location2.y)**2 + (location1.z - location2.z)**2)
    
    def find_next_waypoint(self, route):
        new_waypoint = None

        if self.current_waypoint is None or not isinstance(self.current_waypoint, int) :
            self.current_waypoint = 0
                # these are carla locations, NOT carla waypoints (no need to get transform)
            new_waypoint = route[self.current_waypoint] 
        else:
            
            # type is carla Location
            self.current_waypoint += 1 # increment index of waypoint (look at next waypoint in route)
            
            # go to 0 
            if self.current_waypoint >= len(route):
                self.current_waypoint = 0
            
            new_waypoint = route[self.current_waypoint]


        print("vehicle no", self.vehicle_id, "heading to waypoint no", route[self.current_waypoint], "at : ", new_waypoint)
        currLocation = self.vehicle.get_transform().location
        

        distance = self.distance(new_waypoint, currLocation)
        print("distance is: ", distance)
        self.agent.set_destination(new_waypoint)
        

        
    def find_next_random_waypoint(self):
        while True:
            new_waypoint = random.choice(self.waypoint_list)

            if self.current_waypoint is None or self.distance(self.current_waypoint, new_waypoint.transform.location) > 50:  # Check if new waypoint is at least 50 meters away
                break
        print("reached waypoint, heading to: ", new_waypoint) 
        self.agent.set_destination(new_waypoint.transform.location)
        self.current_waypoint = new_waypoint.transform.location
            
    # function that advances vehicle by ONE tick. main loop is run by fleet manager
    def run_step(self, route):
        if self.vehicle is None or self.agent is None:
            return
        try:
#             print("vehicle ", self.vehicle_id, "control: ", control)
#             print(control)
            # set route if none is seen
            if self.route == None:
                self.find_next_waypoint(route)
                self.route = route
                
            self.vehicle.apply_control(self.agent.run_step())
            
            self.check_control_and_print()
            
            
                
            # check if vehicle has arrived at its destination    
            if self.agent.done():
                self.find_next_waypoint(route)
        except Exception as e:
            print("Caught RuntimeError while trying to control the vehicle:", e)
    
    def check_control_and_print(self):
        control = self.vehicle.get_control()
        
        current_time = time.time()
        
        # light turn
        if abs(control.steer * 100) >= 5 and abs(control.steer * 100) < 10:
                if current_time - self.last_capture_time >= self.light_steer_interval:
                    self.print_image_to_folder(control)
                    self.last_capture_time = current_time
                    
        # sharp turn
        elif abs(control.steer * 100) >= 10:
            if current_time - self.last_capture_time >= self.sharp_steer_interval:
                self.print_image_to_folder(control)
                self.last_capture_time = current_time

        # drive straight
        elif current_time - self.last_capture_time >= self.interval: 
            self.print_image_to_folder(control)
            self.last_capture_time = current_time
        
    def run_predict_step(self, callback=None, *args, **kwargs):
        if self.vehicle is None or self.agent is None:
            return
        
        try:
            
            # get image information to 
            sem_im = self.grab_and_display_sem()
            rgb_im, sem_im = self.grab_and_display_image()
    
            prediction = predict(sem_im, CARLA=True)

            new_steering = 2 * (prediction[0] - 0.5)
            
            control = carla.VehicleControl(steer=new_steering, throttle=prediction[1] *0.75, brake=prediction[2] * 1.5)
            self.vehicle.apply_control(control)
            
            if callback is not None:
                callback(control=control, *args, **kwargs)
        except Exception as e:
            print("Caught RuntimeError while trying to control the vehicle:", e)

    def activate_autopilot(self):
        self.vehicle.set_autopilot(True)
        self.on_autopilot = True

    def run_autopilot_step(self):
        try:
#             print(self.vehicle.get_control())
            self.check_control_and_print()
            
        except Exception as e:
            print("Caught RuntimeError while trying to control the vehicle:", e)
        


## TRAFFIC MANAGER SETUP

In [27]:
# Set up the simulator in synchronous mode
settings = world.get_settings()
settings.synchronous_mode = True # Enables synchronous mode
settings.fixed_delta_seconds = 0.05
world.apply_settings(settings)

# Set up the TM in synchronous mode
traffic_manager = client.get_trafficmanager()
traffic_manager.set_synchronous_mode(True)

# Set a seed so behaviour can be repeated if necessary
traffic_manager.set_random_device_seed(2)
random.seed(2)

# Set traffic manager safety settings: no lane changes, drive at 80%, set global distance
traffic_manager.set_global_distance_to_leading_vehicle(5.0)
traffic_manager.global_percentage_speed_difference(0)


# Draw the spawn point locations as numbers in the map
# for i, spawn_point in enumerate(spawn_points):
#     world.debug.draw_string(spawn_point.location, str(i), life_time=10)

# Select some models from the blueprint library
models = ['dodge', 'audi', 'model3', 'mini', 'mustang', 'lincoln', 'prius', 'nissan', 'crown', 'impala']
blueprints = []
for vehicle in world.get_blueprint_library().filter('*vehicle*'):
    if any(model in vehicle.id for model in models):
        blueprints.append(vehicle)

# Set a max number of vehicles and prepare a list for those we spawn
max_vehicles = 200
max_vehicles = min([max_vehicles, len(spawn_points)])
vehicles = []

filtered_spawn_points = [sp for i, sp in enumerate(spawn_points) if i not in [0, 1, 2]]

sampled_spawn_points = random.sample(filtered_spawn_points, max_vehicles)

# Take a random sample of the spawn points and spawn some vehicles
for i, spawn_point in enumerate(sampled_spawn_points):
    try:
        if spawn_point is not spawn_points[0]:
            temp = world.try_spawn_actor(random.choice(blueprints), spawn_point)
            if temp is not None:
                vehicles.append(temp)
    except Exception as e:
        print("Failed to spawn actor:", e)


# Parse the list of spawned vehicles and give control to the TM through set_autopilot()
for vehicle in vehicles:
    vehicle.set_autopilot(True)
    traffic_manager.auto_lane_change(vehicle,False)
    traffic_manager.distance_to_leading_vehicle(vehicle, 5.0)
    traffic_manager.vehicle_percentage_speed_difference(vehicle, 0)

# Run the simulation so we can inspect the results with the spectator

# test run simulation with traffic

# while True:
#     world.tick()
    
#     if keyboard.is_pressed('q'):  # Check if 'q' is pressed
#         print("Quitting simulation.")
#         break

# clear_world()


AttributeError: 'Client' object has no attribute 'stop'

In [26]:
spectator = world.get_spectator()
def on_collision(event, vehicle, vehicle_id, respawn=False):
    
    print("Collision detected: Actor %s (vehicle %s) hit %s" % (event.other_actor.type_id, vehicle_id, event.actor.type_id))
    # Stop the vehicle (or apply any other logic you deem necessary)
    vehicle.destroy()
    vehicle.spawn(vehicle.spawned_at) #respawn the vehicle
    
    # may need to set up autopilot but idk
    if vehicle.on_autopilot:
        vehicle.activate_autopilot()
    
# set to synchronous mode
# settings = world.get_settings()
# settings.synchronous_mode = True  # Enables synchronous mode
# settings.fixed_delta_seconds = 0.05  # Sets the time step for each tick
# traffic_manager = client.get_trafficmanager()
# traffic_manager.set_synchronous_mode(True)
# world.apply_settings(settings)



# for tracking a specific route
# left_locations = [
#     carla.Location(x=-417, y=6, z=2),
#     carla.Location(x=140, y=220, z=2),
#     carla.Location(x=223.025146, y=-385.143005, z=0.281942),
# ]

# right_locations = [
#     carla.Location(x=-20, y=6, z=12.5),
#     carla.Location(x=-395, y=37, z=2),
#     carla.Location(x=11.2, y=-121, z=2),
#     carla.Location(x=225, y=-364, z=0)
# ]

try:
    vehicle = Vehicle(world=world, vehicle_id=0, vehicle_name="right_car", collision_callback=on_collision)
#     vehicle2= Vehicle(world=world, vehicle_id=1, vehicle_name="left_car", collision_callback=on_collision)

    vehicle.spawn(spawn_point=0)
#     vehicle2.spawn(spawn_point=2)
    world.tick()

#     vehicle.activate_autopilot()
#     vehicle2.activate_autopilot()

    while True:
        world.tick()
        # vehicle.run_step(route=right_locations)
        # vehicle2.run_step(route=left_locations)
        vehicle.run_predict_step()

#         vehicle.run_autopilot_step()q
#         vehicle2.run_autopilot_step()

        if keyboard.is_pressed('q'):  # Check if 'q' is pressed
            print("Quitting simulation.")
            break
    
finally:
    
    settings.synchronous_mode = False
    settings.fixed_delta_seconds = None
    world.apply_settings(settings)
    traffic_manager.set_synchronous_mode(False)
    clear_world()
    cv2.destroyAllWindows()
    vehicles = []

spawning vehicle:  0
Transform(Location(x=225.252029, y=-364.109375, z=0.281942), Rotation(pitch=0.000000, yaw=-0.295319, roll=0.000000))
Quitting simulation.


In [40]:


# Function to create the main UI window
def create_main_window():
    root = tk.Tk()
    root.title("Spectator Position in CARLA")
    label = tk.Label(root, text="Waiting for CARLA data...")
    label.pack(padx=20, pady=20)
    return root, label

root, position_label = create_main_window()

def update_position():
    # Assume 'client' and 'world' are already set up for CARLA
    spectator = world.get_spectator()
    transform = spectator.get_transform()
    location = transform.location
    position_text = f"Spectator Location: x={location.x:.2f}, y={location.y:.2f}, z={location.z:.2f}"
    position_label.config(text=position_text)
    
    # Schedule this function to run again after 1000 ms (1 second)
    root.after(1000, update_position)
    
# Start the periodic update of position
update_position()

# Start the Tkinter event loop
root.mainloop()


# right turn route: 1: 386, -198, 0.0, 2: -20, 6, z=12.55, 3: x= -414.85, y=27, z=2, 4: 57, y=38, z=13, 5: x=15.45, y=88.15, z=1.98

# left turn route: 
# 1: (x: 212, y: -392, z: 0)
# 2: (x: -417, y: 6,  z: 2)
# 3: (x: 140, y: 220, z: 2)


In [42]:
# route planner

from agents.navigation.global_route_planner import GlobalRoutePlanner


carla_map = world.get_map()

# Set up the route planner
grp = GlobalRoutePlanner(carla_map, 2.0)

# Manually define locations (you need to know the coordinates)
locations = [
    carla.Location(x=225, y=-364, z=0),
    carla.Location(x=-20, y=6, z=12.5),
    carla.Location(x=-395, y=37, z=2),
    carla.Location(x=11.2, y=-121, z=2),
    carla.Location(x=225, y=-364, z=0)
]

# locations = [
#     carla.Location(x=223.025146, y=-385.143005, z=0.281942),
#     carla.Location(x=-417, y=6, z=2),
#     carla.Location(x=140, y=220, z=2),
#     carla.Location(x=223.025146, y=-385.143005, z=0.281942),
# ]

# colors = [
#     carla.Color(255, 0, 0, 255),   # Red
#     carla.Color(0, 255, 0, 255),   # Green
#     carla.Color(0, 0, 255, 255),   # Blue
#     carla.Color(255, 255, 0, 255), # Yellow
#     carla.Color(255, 165, 0, 255), # Orange
#     carla.Color(75, 0, 130, 255),  # Indigo
# ]

def compute_route(grp, locations):
    route = []
    for i in range(len(locations) - 1):
        
        start = locations[i]
        end = locations[i + 1]
        
        # Compute the route segment between each pair of locations
        waypoint_start = world.get_map().get_waypoint(start)
        waypoint_end = world.get_map().get_waypoint(end)
        route_segment = grp.trace_route(waypoint_start.transform.location, waypoint_end.transform.location)
        route.extend(route_segment)  # Add the segment to the overall route
    return route

route = compute_route(grp, locations)

# color interpolation
def interpolate_color(start_color, end_color, factor):
    """ Interpolate between two colors. Factor is between 0 and 1. """
    r = start_color.r + factor * (end_color.r - start_color.r)
    g = start_color.g + factor * (end_color.g - start_color.g)
    b = start_color.b + factor * (end_color.b - start_color.b)
    a = factor * 255
    return carla.Color(int(r), int(g), int(b), int(a))

def visualize_route_with_interpolated_colors(world, route):
    start_color = carla.Color(255, 255, 255)  # Red
    end_color = carla.Color(0, 0, 0)    # Blue

    num_waypoints = len(route)

    # Visualize the route using debug drawing
    for i, (waypoint, _) in enumerate(route):
        
        factor = i / (num_waypoints - 1) if num_waypoints > 1 else 0
        color = interpolate_color(start_color, end_color, factor)
        print(factor, color)
        
        world.debug.draw_string(waypoint.transform.location + carla.Location(z=0.5), 'O', color=color, life_time=20.0)

# Assuming 'world' and 'route' are already defined
visualize_route_with_interpolated_colors(world, route)

0.0 Color(255,255,255,0)
0.000513083632632119 Color(254,254,254,0)
0.001026167265264238 Color(254,254,254,0)
0.0015392508978963571 Color(254,254,254,0)
0.002052334530528476 Color(254,254,254,0)
0.002565418163160595 Color(254,254,254,0)
0.0030785017957927143 Color(254,254,254,0)
0.0035915854284248334 Color(254,254,254,0)
0.004104669061056952 Color(253,253,253,1)
0.004617752693689072 Color(253,253,253,1)
0.00513083632632119 Color(253,253,253,1)
0.005643919958953309 Color(253,253,253,1)
0.0061570035915854285 Color(253,253,253,1)
0.006670087224217547 Color(253,253,253,1)
0.007183170856849667 Color(253,253,253,1)
0.007696254489481785 Color(253,253,253,1)
0.008209338122113904 Color(252,252,252,2)
0.008722421754746024 Color(252,252,252,2)
0.009235505387378143 Color(252,252,252,2)
0.009748589020010261 Color(252,252,252,2)
0.01026167265264238 Color(252,252,252,2)
0.0107747562852745 Color(252,252,252,2)
0.011287839917906618 Color(252,252,252,2)
0.011800923550538737 Color(251,251,251,3)
0.0123140

## object oriented model tester here

In [28]:
spectator = world.get_spectator()
def on_collision(event, vehicle, vehicle_id, respawn=False):
    
    print("Collision detected: Actor %s (vehicle %s) hit %s" % (event.other_actor.type_id, vehicle_id, event.actor.type_id))
    # Stop the vehicle (or apply any other logic you deem necessary)
    vehicle.destroy()
    vehicle.spawn(vehicle.spawned_at) #respawn the vehicle
    
    # may need to set up autopilot but idk
    if vehicle.on_autopilot:
        vehicle.activate_autopilot()
        
# Function to create the main UI window
def create_main_window():
    root = tk.Tk()
    root.title("Spectator Position in CARLA")
    label = tk.Label(root, text="Waiting for CARLA data...")
    label.pack(padx=50, pady=50)
    return root, label

root, control_label = create_main_window()

# def update_position():
#     # Assume 'client' and 'world' are already set up for CARLA
#     spectator = world.get_spectator()
#     transform = spectator.get_transform()
#     location = transform.location
#     position_text = f"Spectator Location: x={location.x:.2f}, y={location.y:.2f}, z={location.z:.2f}"
#     position_label.config(text=position_text)
    
#     Schedule this function to run again after 1000 ms (1 second)
#     root.after(1000, update_position)

def print_control(control, label):
    control_text = (f"Predicted Control - Steering: {control.steer: .3f}, Throttle: {control.throttle: .3f}, Brake: {control.brake:.3f}")
    label.config(text=control_text)
    
# Start the periodic update of position
# update_position()


try:
    vehicle = Vehicle(world=world, vehicle_id=0, vehicle_name="right_car", collision_callback=on_collision)
#     vehicle2= Vehicle(world=world, vehicle_id=1, vehicle_name="left_car", collision_callback=on_collision)

    vehicle.spawn(spawn_point=7)
#     vehicle2.spawn(spawn_point=2)
    world.tick()

#     vehicle.activate_autopilot()
#     vehicle2.activate_autopilot()

    while True:
        world.tick()
        # vehicle.run_step(route=right_locations)
        # vehicle2.run_step(route=left_locations)
        vehicle.run_predict_step(callback=print_control, label=control_label)
        root.update_idletasks()
        root.update()

        if keyboard.is_pressed('q'):  # Check if 'q' is pressed
            print("Quitting simulation.")
            break
        
finally:
    
    settings.synchronous_mode = False
    settings.fixed_delta_seconds = None
    world.apply_settings(settings)
    traffic_manager.set_synchronous_mode(False)
    root.destroy()
    
    clear_world()
    cv2.destroyAllWindows()
    vehicles = []

spawning vehicle:  0
Transform(Location(x=225.252029, y=-364.109375, z=0.281942), Rotation(pitch=0.000000, yaw=-0.295319, roll=0.000000))
Quitting simulation.


In [23]:

def print_vehicle_controls(vehicle):
    control = vehicle.get_control()
    normalized_steer = (control.steer + 1) /2
    normalized_throttle = (control.throttle)
    normalized_brake = (control.brake)
    print("Steer: {:.3f}, Throttle: {:.3f}, Brake: {:.3f}".format(normalized_steer, normalized_throttle, normalized_brake))




def on_collision(event, vehicle, respawn=False):
    
    print("Collision detected: Actor %s hit %s" % (event.other_actor.type_id, event.actor.type_id))
    # Stop the vehicle (or apply any other logic you deem necessary)
    vehicle.destroy()

def respawn_car(spawnPoint=None):
    global vehicle, agent, collision_sensor, camera_sem, camera_rgb, camera_data
    # Destroy existing actors if they exist
    
    for actor in world.get_actors().filter('*vehicle*'):
        actor.destroy()
    for sensor in world.get_actors().filter('*sensor*'):
        sensor.destroy()
    if vehicle is not None:
        vehicle.destroy()
        vehicle = None
    if collision_sensor is not None:
        collision_sensor.destroy()
        collision_sensor = None
    if camera_sem is not None:
        camera_sem.destroy()
        camera_sem = None
    if camera_rgb is not None:
        camera_rgb.destroy()
        camera_rgb = None
    
    
    # if spawnpoint is unspecified, spawn new vehicle at random place
    
    if (spawnPoint):
        random_spawnpoint = spawnPoint
    
    print("Vehicle spawned at spawn point: ", random_spawnpoint)
    random_spawnpoint = random.randint(0, 10)  # Both 1 and 10 are included
    vehicle = spawn_vehicle(random_spawnpoint)
    
    # sometimes the vehicle isn't spawned correctly and 
    # other threads try to read it and then it gets fucked
    if vehicle is None:
        raise Exception as e:
            print("Failed to spawn vehicle")
        time.sleep(1)

    # delay for a second just in case
    time.sleep(1)
    
    # Setup collision sensor
    collision_sensor = setup_collision_sensor(vehicle, on_collision)

    # Setup cameras
    camera_sem, camera_rgb, camera_data = setup_cameras(vehicle)

    # Setup or reset the agent
    agent = setAgent(vehicle)

    print("New vehicle and sensors spawned. Continuing simulation...")

In [10]:
# helper functions for the main loop

def get_random_waypoint(world):
    map = world.get_map()
    # Generate waypoints every 2 meters
    waypoints = map.generate_waypoints(distance=2.0)
    return random.choice(waypoints)

def distance(location1, location2):
    return math.sqrt((location1.x - location2.x)**2 + (location1.y - location2.y)**2 + (location1.z - location2.z)**2)

# returns the openCV objects of rgb/segmented images from CARLA
def grab_pictures():
#     print_vehicle_controls(vehicle)
        
#     steering_wheel_direction = control.steer
    rgb_im = camera_data['rgb_image']
    sem_im = camera_data['sem_image']

    # display images
    im_h = cv2.hconcat([rgb_im,sem_im])
    cv2.imshow('2 cameras', im_h)
    
    return rgb_im, sem_im

In [9]:
# initialize global variables and create the car
    
vehicle = None
agent = None
collision_sensor = None
camera_sem = None
camera_rgb = None
camera_data = None
start_point = None

respawn_car()

10
New vehicle and sensors spawned. Continuing simulation...


In [None]:
# function to capture CARLA's segmented images.
# as of now, ./out_sem/rgb and ./out_sem/sem folder for it to export properly.


# TODO: will refactor the code to make it object-oriented and implement thread locks
last_capture_time = time.time()
interval = 7.5 # set interval to 10 seconds
sharp_steer_interval = 0.1
light_steer_interval = 0.75


    
current_waypoint = None

if vehicle is None:
    respawn_car()
while True:
    
    # advance the world
    world.tick()

    if vehicle is not None and agent is not None:
        try:
            control = agent.run_step()

            vehicle.apply_control(control)
            current_time = time.time()
            control = vehicle.get_control()

            # note try to factor this out later lol
            # on light turn, delay 0.5
            if abs(control.steer * 100) >= 5 and abs(control.steer * 100) < 10:


                if current_time - last_capture_time >= light_steer_interval:
                    rgb_im, sem_im = grab_pictures()
                    # grab the time
                    time_grab = time.time_ns()
                    # normalize controls to between 0 and 100, with 50 being straight or nothing
                    normalized_steer = (control.steer + 1) /2
                    normalized_throttle = (control.throttle)
                    normalized_brake = (control.brake)
                    cv2.imwrite('out_sem/rgb/%06d_%.3f_%.3f_%.3f.png' % (time_grab, normalized_steer, normalized_throttle, normalized_brake), rgb_im)
                    cv2.imwrite('out_sem/sem/%06d_%.3f_%.3f_%.3f.png' % (time_grab, normalized_steer, normalized_throttle, normalized_brake), sem_im)
                    last_capture_time = current_time  # Update time

            # on sharp turn, delay 0.1
            elif abs(control.steer * 100) >= 10:
                if current_time - last_capture_time >= sharp_steer_interval:
                    # normalize controls to between 0 and 100, with 50 being straight or nothing
                    rgb_im, sem_im = grab_pictures()
                    # grab the time
                    time_grab = time.time_ns()
                    normalized_steer = (control.steer + 1) /2
                    normalized_throttle = (control.throttle)
                    normalized_brake = (control.brake)
                    cv2.imwrite('out_sem/rgb/%06d_%.3f_%.3f_%.3f.png' % (time_grab, normalized_steer, normalized_throttle, normalized_brake), rgb_im)
                    cv2.imwrite('out_sem/sem/%06d_%.3f_%.3f_%.3f.png' % (time_grab, normalized_steer, normalized_throttle, normalized_brake), sem_im)
                    last_capture_time = current_time  # Update time


            elif current_time - last_capture_time >= interval:        
                rgb_im, sem_im = grab_pictures()

                time_grab = time.time_ns()

                normalized_steer = (control.steer + 1) /2
                normalized_throttle = (control.throttle)
                normalized_brake = (control.brake)
                cv2.imwrite('out_sem/rgb/%06d_%.3f_%.3f_%.3f.png' % (time_grab, normalized_steer, normalized_throttle, normalized_brake), rgb_im)
                cv2.imwrite('out_sem/sem/%06d_%.3f_%.3f_%.3f.png' % (time_grab, normalized_steer, normalized_throttle, normalized_brake), sem_im)        
                # update time
                last_capture_time = current_time
            if cv2.waitKey(1) == ord('q'):
                break
            if agent.done():
                while True:
                    new_waypoint = get_random_waypoint(world)
                    if current_waypoint is None or distance(current_waypoint.transform.location, new_waypoint.transform.location) > 50:  # Check if new waypoint is at least 50 meters away
                        break
                print("reached waypoint, heading to: ", new_waypoint) 
                agent.set_destination(new_waypoint.transform.location)
                current_waypoint = new_waypoint
        except Exception as e:
            print("Caught RuntimeError while trying to control the vehicle:", e)
            time.sleep(1)
    

clear_world()
cv2.destroyAllWindows()

Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined
Caught RuntimeError while trying to control the vehicle: name 'control' is not defined


In [11]:
from predict_control import predict


# if vehicle is None:
respawn_car()
    
# see if you can keep a '3 second log' and then write to it when you crash
while True:
    
    world.tick()
    try:
        rgb_im, sem_im = grab_pictures()

        prediction = predict(sem_im, CARLA=True)

        new_steering = 2 * (prediction[0] - 0.5)
        print(f"steer: {new_steering}, throttle: {prediction[1]}, brake: {prediction[2]}")
        control = carla.VehicleControl(steer=new_steering, throttle=prediction[1], brake=0.0)
        vehicle.apply_control(control)
        current_time = time.time()
    except Exception as e:
        print(e)
        time.sleep(1)
        respawn_car()
#     control = vehicle.get_control()
    
    if cv2.waitKey(1) == ord('q'):
        break
clear_world()
cv2.destroyAllWindows()

FileNotFoundError: [Errno 2] No such file or directory: 'C:\\Users\\Clarence\\Documents\\Coding\\cs141\\researchProject\\selfDrivingCar\\Self_Driving_Car\\models\\driver_model-3.pt'

In [18]:
#grab a snapshot from the camera an show in a pop-up window
img = camera_data['image']
cv2.imshow('RGB Camera',img)
cv2.waitKey(0)

113

In [82]:
# clean up after yourself

# camera.stop() # this is the opposite of camera.listen
for actor in world.get_actors().filter('*vehicle*'):
    actor.destroy()
for sensor in world.get_actors().filter('*sensor*'):
    sensor.destroy()

In [7]:
len(spawn_points)

10422

In [8]:
start_point = spawn_points[0]
spectator = world.get_spectator()
spectator.set_transform(start_point)

In [12]:
spectator.set_transform(carla.Transform(carla.Location(x=-1085.286377, y=3112.225830, z=356.060608), carla.Rotation(pitch=1.648070, yaw=20.234367, roll=0.000000)))

In [11]:
print(start_point)

Transform(Location(x=-1085.286377, y=3112.225830, z=356.060608), Rotation(pitch=1.648070, yaw=20.234367, roll=0.000000))
