In [23]:
import tkinter as tk
from tkinter import messagebox
import lib_scen_intersection as lsi
import lib_map_draw as dr
import lib_spawn as spb
import lib_traffic as trf
import lib_sensors as sen
import carla
from importlib import reload
from collections import defaultdict
import json
reload(lsi)
reload(dr)
reload(spb)
reload(trf)
reload(sen)

class TrafficSimulatorGUI:
    def __init__(self, root, spacing=10):
        self.root = root
        self.root.title("Junction Scenarios")
        self.spacing = spacing
        self.world, self.crossings = self.connect_sim();
        
        self.spawns = None    #list of waypoint couples surround sleected junction
        self.gr_pairs = None  #dictionary (map) of wp couples where the key is the common destination waypoint [1] = [(a,1), (b,1), (c,1)]
        self.crossing_index = 0
        self.spawn_index = 0

        self.ego_vehicle = None #(vehicle, wp)
        self.current_spawned = None #(vehicle, wp)
        self.ego_spawn_location = None #spawn coordinates of current ego vehicle
        self.npc_vehicles = []
        self.threads = []
        self.tol_behind_wp = 50 #def 100
        self.draw_time = 2

        self.obst_sen_entry = None
        self.obst_sen_val = None

        # sim reset button
        tk.Button(root, text="Reset Simulator", command=self.reset_simulator).grid(row=0, column=0, columnspan=4, pady=self.spacing)

        # init button for junction
        self.create_navigation_section("Junction", self.crossings,
                                       "Prev Junction", "Next Junction", 1)

        # Init button for spawn
        self.create_navigation_section("Spawn", self.spawns,
                                       "Prev Spawn", "Next Spawn", 2)

        # "Spawn vehicle" Button 
        tk.Button(root, text="Spawn vehicle", command=self.spawn_vehicle).grid(row=3, column=0, pady=self.spacing)
        tk.Button(root, text="Set Current Spawn as Ego", command=self.set_ego).grid(row=3, column=1, pady=self.spacing)
        tk.Button(root, text="Spawn on common routes", command=self.spawn_common_route).grid(row=3, column=2, padx=self.spacing)

        tk.Button(root, text="Focus EGO", command=self.focus_ego).grid(row=4, column=0, padx=self.spacing, pady=self.spacing)
        # DRAW AND SPAWN ON COMMON ROUTES
        tk.Button(root, text="Draw common routes", command=self.draw_common_route).grid(row=4, column=1, padx=self.spacing)
        tk.Button(root, text="Fill Junction", command=self.fill_junction).grid(row=4, column=2, padx=self.spacing, pady=self.spacing)

        # Selection menu for ego agent profile
        tk.Label(root, text="Ego Agent Profile:").grid(row=5, column=0, padx=self.spacing, pady=self.spacing)
        self.spawn_profile = tk.StringVar(value="Select")
        spawn_menu = tk.OptionMenu(root, self.spawn_profile, "Cautious", "Normal", "Aggressive",
                                   command=self.display_spawn_profile)
        spawn_menu.grid(row=5, column=1, padx=self.spacing, pady=self.spacing)
        
        #checkbox SKIP TRAFFIC LIGHT
        self.ignore_traffic_lights = tk.BooleanVar(value=True)
        tk.Checkbutton(root, text="Skip Semaphore", variable=self.ignore_traffic_lights).grid(row=5, column=2, padx=self.spacing)

        # Selection menu for traffic agent profile
        tk.Label(root, text="Traffic Agent Profile:").grid(row=6, column=0, padx=self.spacing, pady=self.spacing)
        self.traffic_profile = tk.StringVar(value="Select")
        traffic_menu = tk.OptionMenu(self.root, self.traffic_profile, "Cautious", "Normal", "Aggressive",
                                     command=self.display_traffic_profile)
        traffic_menu.grid(row=6, column=1, padx=self.spacing, pady=self.spacing)

        tk.Label(root, text="Obstacle Tolerance (in meters):").grid(row=6, column=2, pady=self.spacing)
        self.obst_sen_entry = tk.Entry(root, width=4) 
        self.obst_sen_entry.insert(0, "5") 
        self.obst_sen_entry.grid(row=6, column=3, pady=self.spacing)
        
        #generate all scenarios
        tk.Button(root, text="Generate all scenarios", command=self.generate_all_scenarios).grid(row=9, column=0, padx=self.spacing)
        
        #generate all scenarios
        tk.Button(root, text="Uploas scenario", command=self.generate_all_scenarios).grid(row=9, column=1, padx=self.spacing)
        
        #start simulation button
        tk.Button(root, text="Start simulation", command=self.start_simulation).grid(row=10, column=0, columnspan=4, pady=self.spacing)

    #handler for navigation spawn and junction navigation buttons
    def create_navigation_section(self, label, array, left_text, right_text, row):
        
        tk.Label(self.root, text=f"{label} attuale:").grid(row=row, column=0, padx=self.spacing, pady=self.spacing)
        
        #First boot
        if label == "Junction":
           display_label = tk.Label(self.root, text=f"{self.crossing_index + 1}/{len(self.crossings)}")
           
            # for each junction, retrive the list of surrounding waypoint couples. Each couple is indexed by self.crossing_index
           self.gr_pairs = lsi.group_pairs_by_second_waypoint(self.crossings[self.crossing_index][1].get_waypoints(carla.LaneType.Driving) )
           self.spawns = spb.list_all_behind_wp_couple(self.world, self.gr_pairs, self.tol_behind_wp)
           
           dr.focus_above(self.world, self.spawns[0][0].location, 60, -90)
           self.draw_common_route()
            
        elif label == "Spawn":
            display_label = tk.Label(self.root, text=f"{self.spawn_index + 1}/{len(self.spawns)}")

            #retrieve vehicle at current position
            self.current_spawned = spb.get_vehicle_at_spawn_point(self.spawns[self.spawn_index], self.npc_vehicles, 3)
            dr.focus_above(self.world, self.spawns[self.spawn_index][0].location, 60, -90)
            self.draw_common_route()
            
        display_label.grid(row=row, column=1, padx=self.spacing, pady=self.spacing)
        
        #function activated by navigation buttons
        def update_label(direction):
            nonlocal display_label
            if label == "Junction":
                dr.pair_counter = 0
                self.crossing_index = (self.crossing_index + direction) % len(self.crossings)
                display_label.config(text=f"{self.crossing_index  + 1 }/{len(self.crossings)}")#self.crossings[self.crossing_index][0]) #hunction contiene coppie id, coordinate
                
                # for each junction, retrive the list of surroinding waypoint couples. Each couple is indexed by self.crossing_index
                self.gr_pairs = lsi.group_pairs_by_second_waypoint(self.crossings[self.crossing_index][1].get_waypoints(carla.LaneType.Driving) )
                self.spawns = spb.list_all_behind_wp_couple(self.world, self.gr_pairs, self.tol_behind_wp) #last = distance
                
                dr.focus_above(self.world, self.spawns[0][0].location, 60, -90)
                self.draw_common_route()
                self.spawn_index = 0
                
            elif label == "Spawn":
                self.spawn_index = (self.spawn_index + direction) % len(self.spawns)
                display_label.config(text=f"{self.spawn_index + 1}/{len(self.spawns)}")

                #retrieve vehicle at current position
                self.current_spawned = spb.get_vehicle_at_spawn_point(self.spawns[self.spawn_index], self.npc_vehicles, 3)
                dr.focus_above(self.world, self.spawns[self.spawn_index][0].location, 60, -90)
                self.draw_common_route()

        tk.Button(self.root, text=left_text, command=lambda: update_label(-1)).grid(row=row, column=2, padx=self.spacing, pady=self.spacing)
        tk.Button(self.root, text=right_text, command=lambda: update_label(1)).grid(row=row, column=3, padx=self.spacing, pady=self.spacing)
    
    #reset button and cleans simulator
    def reset_simulator(self):
        # reset indexes of junctions and spawn point
        self.crossing_index = 0
        self.spawn_index = 0

        self.ego_vehicle = None
        self.current_spawned = None
        self.npc_vehicles = []

        # Selection reset
        self.spawn_profile.set("Select")
        self.traffic_profile.set("Select")

        # checkbox reset
        self.ignore_traffic_lights.set(True)

        #delete actors from sim
        lsi.reset_sim(self.world)

        trf.stop_threads();
        sen.reset_min_dist()
        
        # init button for junction
        self.create_navigation_section("Junction", self.crossings,
                                       "Prev Junction", "Next Junction", 1)
        # Init button for spawn
        self.create_navigation_section("Spawn", self.spawns,
                                       "Prev Spawn", "Next Spawn", 2)
        
        messagebox.showinfo("Reset Completed", "All actors and sensors destroyed.")

    #BUTTON SPAWN EGO VEHICLE
    def spawn_vehicle(self):
        #traffic_lights_status = "Traffic Light ignored" if self.ignore_traffic_lights.get() else "Traffic Light observed"
        
        (v, w, s) = spb.spawn_vehicle(self.world, self.spawns[self.spawn_index])
        if v is not None:
           self.current_spawned =  (v, w, s)
           self.npc_vehicles.append((v, w, s))
           print(f"Spawned {v}")
           #self.ego_spawn_location = self.spawns[self.spawn_index][0]
        else: 
           messagebox.showinfo("INFO", f"Current spawn location already busy")
           
    #BUTTON SET EGO VEHICLE
    def set_ego(self):

        #check if there is a vehicle in current focused position
        if self.current_spawned is None:
           messagebox.showinfo("INFO", "No vehicle spawned at current location!!")
        else:
            if self.ego_vehicle is not None:
               self.npc_vehicles.append(self.ego_vehicle)
            self.ego_vehicle = self.current_spawned
            self.npc_vehicles = spb.recompute_npc_list(self.npc_vehicles, self.current_spawned)
            self.ego_spawn_location = self.spawns[self.spawn_index]
            print(self.ego_vehicle[0])

    def focus_ego(self):
        if  self.ego_vehicle is not None:
            v, w_dest, s = self.ego_vehicle
            dr.focus_above(self.world, v.get_transform().location, 60, -90)
        else:
            messagebox.showinfo("INFO", "No ego vehicle set yet.")
        #dr.draw_route(self.world, spb.get_wp_from_vh(self.world, v), w_dest, '<->', 255, 255, 0, 300)
    
    #spawn vehicles on all junction edges
    def fill_junction(self):
        self.npc_vehicles += spb.spawn_traffic(self.world, self.spawns) 

    def display_spawn_profile(self, selection):
        print("Spawn agent profile", f"Selected profile for EGO: {selection}")

    def display_traffic_profile(self, selection):
        print("Traffic profile", f"Selected profile for Traffic: {selection}")

    #SIM CONNECTION 
    def connect_sim(self):
        client = carla.Client('localhost', 2000)
        client.set_timeout(5.0)
        world = client.get_world()
        bp_lib = world.get_blueprint_library() 
        map = world.get_map()
        
        # wp list with 2 mt offset
        waypoints = map.generate_waypoints(2.0)
        
        # junction array (id, oggetto_incrocio)
        unique_junctions = []
        
        # retrieving junction by adjacent waypoints
        #!!: different waypoints may belong to same junction
        for waypoint in waypoints:
            if waypoint.is_junction:
                junction = waypoint.get_junction()
                if not(any(u[0] == junction.id for u in unique_junctions)):
                    unique_junctions.append( (junction.id, junction) )
        
        # Print junction number (9 distinct in town 10)
        print(f"Junctions found: {len(unique_junctions)}\n") 
        return ( world, sorted(unique_junctions, key=lambda pair: pair[0]) )
    
    #given current spawn and dest_wp, draws all routes starting from a spawn towards dest_wp
    def draw_common_route(self):
        wp2 = self.spawns[self.spawn_index][1]
        key_wp2 = (round(wp2.transform.location.x, 3), round(wp2.transform.location.y, 3))
        dr.draw_intersection(self.world, self.gr_pairs[key_wp2], self.draw_time)
        dr.draw_spawn(self.world, self.spawns[self.spawn_index][0], self.draw_time, f"SP_{self.crossing_index + 1}_{self.spawn_index + 1}", 0, 255, 0)

    #spawns vehicle on all routes having the same wp as destination
    def spawn_common_route(self):
        wp2 = self.spawns[self.spawn_index][1]
        key_wp2 = (round(wp2.transform.location.x, 3), round(wp2.transform.location.y, 3))
        #current_sp = self.spawns[self.spawn_index][0]
        
        for sp, wp_dest in self.spawns:
            wp_d = (round(wp_dest.transform.location.x, 3), round(wp_dest.transform.location.y, 3))
            if key_wp2 == wp_d:
               v, w, s = spb.spawn_vehicle(self.world, (sp, wp_dest))
               if v is not None:
                   self.npc_vehicles.append((v, w, s))

    #WIP
    def generate_all_scenarios(self):
        #focused on one junction
        grouped_pairs = lsi.group_pairs_by_second_waypoint(self.crossings[self.crossing_index][1].get_waypoints(carla.LaneType.Driving) )
        #self.spawns = spb.list_all_behind_wp_couple(self.world, self.gr_pairs, self.tol_behind_wp)
        #continua da qui
        idx = 0
        scenarios = defaultdict(list)
        for wp2_coords, pairs in grouped_pairs.items():
            i = 0
            if len(pairs) > 1:
               for (wp1, wp2) in pairs:
                   if i < 2:
                       scenarios[idx].append( (spb.get_spawn_behind_wp(self.world, wp1, self.tol_behind_wp, 0), wp2) )
                       i = i + 1
               #create another scenario with inverted couples of spwan and wp
               scenarios[idx + 1].append( (scenarios[idx][1], scenarios[idx][0]) )
               idx = idx + 2
        print(f"Generating scenarios...{len(scenarios)}")

    ##### START SIMULATION FUNCTION AVVIO SIMULAZIONE #####
    def start_simulation(self):
        # Raccoglie i parametri selezionati
        crossing = self.crossings[self.crossing_index]
        
        obst_sen_val = self.obst_sen_entry.get()
        spawn_profile = self.spawn_profile.get().lower()
        traffic_profile = self.traffic_profile.get().lower()
        traffic_lights_val = self.ignore_traffic_lights.get()
        traffic_lights_msg = "Semaphores skipped" if traffic_lights_val else "Semaphores observed"
        
        # check if profiles are set
        try:
            profiles = ["cautious", "normal", "aggressive"]
            msg = ""
            
            if spawn_profile not in profiles:
                msg = "Choose a driving profile for Ego vehicle!!"
                raise AttributeError
            if len(self.npc_vehicles) != 0 and traffic_profile not in profiles:
                msg = "Choose a driving profile for Traffic!!"
                raise AttributeError
            if self.ego_vehicle is None:
                msg = "No Ego vehicle spawned!!"
                raise AttributeError

            # convert treshold from string to float
            obst_sen_val = float(obst_sen_val)
            
        except ValueError:
            messagebox.showerror("ERROR", "Insert a valid number for obstacle sensor!!")
            return
        except AttributeError:
            messagebox.showinfo("ERROR: ", msg)
            return

        #retrieving spawn location for ego and first npc
        ego_spawn_loc = self.ego_spawn_location[0].location
        tmp_ego = self.ego_spawn_location[0].rotation
        ego_pitch, ego_yaw, ego_roll = tmp_ego.pitch, tmp_ego.yaw, tmp_ego.roll
        
        ego_dest = self.ego_spawn_location[1].transform.location
        tmp_ego_dest = self.ego_spawn_location[1].transform.rotation
        ego_dest_pitch, ego_dest_yaw, ego_dest_roll = tmp_ego_dest.pitch, tmp_ego_dest.yaw, tmp_ego_dest.roll
        
        print(f"Stampo destinazione: {self.ego_spawn_location[1].transform}")
        
        npc_spawn_loc = carla.Location(x=0.0, y=0.0, z=0.0)
        npc_dest = carla.Location(x=0.0, y=0.0, z=0.0)
        npc_dest_pitch = npc_dest_yaw = npc_dest_roll = npc_pitch = npc_yaw = npc_roll = 0
        
        #!!takes only first NPC in list, always check if it's involved in your use case scenario
        if len(self.npc_vehicles) != 0:
           print(f"Lunghezza npc list: {len(self.npc_vehicles)}")
           npc_spawn_loc = self.npc_vehicles[0][2].location #each vahicle has ( vehicle, dest_wp, spawn )
           tmp_npc = self.npc_vehicles[0][2].rotation
           npc_pitch, npc_yaw, npc_roll = tmp_npc.pitch, tmp_npc.yaw, tmp_npc.roll
           
           npc_dest = self.npc_vehicles[0][1].transform.location
           tmp_npc_dest = self.npc_vehicles[0][1].transform.rotation
           npc_dest_pitch, npc_dest_waw, npc_dest_roll = tmp_npc_dest.pitch, tmp_npc_dest.yaw, tmp_npc_dest.roll
           
        summary = (f"Simulation started with following parameters:\n"
                   f"- Selected Junction: {self.crossing_index}\n\n"
                   f"- Selected EGO spawn: \n{ego_spawn_loc.x,ego_spawn_loc.y,ego_spawn_loc.z}\n\n"
                   f"- Selected EGO destination: \n{ego_dest.x,ego_dest.y,ego_dest.z}\n\n"
                   f"- Selected NPC spawn: \n{npc_spawn_loc.x,npc_spawn_loc.y,npc_spawn_loc.z}\n\n" 
                   f"- Selected NPC destination: \n{npc_dest.x,npc_dest.y,npc_dest.z}\n\n" 
                   f"- Distance obstacle tolerance: {obst_sen_val} meters\n"
                   f"- Agent ego profile: {spawn_profile}\n"
                   f"- Agent traffic profile: {traffic_profile}\n"
                   f"- Ignore Traffic light : {traffic_lights_msg}")
        
        messagebox.showinfo("Start Simulation", summary)

        data_to_save = {
        "junction_id": self.crossing_index + 1,
        "ego_spawn": {
                    "x": ego_spawn_loc.x,
                    "y": ego_spawn_loc.y,
                    "z": ego_spawn_loc.z,
                    "pitch": ego_pitch, 
                    "yaw": ego_yaw, 
                    "roll": ego_roll
                    },
        "ego_dest": {
                    "x": ego_dest.x,
                    "y": ego_dest.y,
                    "z": ego_dest.z,
                    "pitch": ego_dest_pitch, 
                    "yaw": ego_dest_yaw, 
                    "roll": ego_dest_roll
                    },
        "npc_spawn": {
                    "x": npc_spawn_loc.x,
                    "y": npc_spawn_loc.y,
                    "z": npc_spawn_loc.z,
                    "pitch": npc_pitch, 
                    "yaw": npc_yaw, 
                    "roll": npc_roll
                    },
        "npc_dest": {
                    "x": npc_dest.x,
                    "y": npc_dest.y,
                    "z": npc_dest.z,
                    "pitch": npc_dest_pitch, 
                    "yaw": npc_dest_yaw, 
                    "roll": npc_dest_roll
                    },
        "ego_agent": spawn_profile,
        "traffic_agent": traffic_profile
        }
        
        # save data to json file
        with open("sim_data.log", "a") as file:
            file.write(str(data_to_save) + '\n')
                #json.dump(data_to_save, file, indent=4)
        
        #########################   START SIMULATION MAIN SECTION   #########################
        threads = []
        #reset stop flag for threads
        trf.reset_stop_event()
        #reset min dist for obstacle sensor
        sen.reset_min_dist()
        
        #START TRAFFIC IF PRESENT
        if len(self.npc_vehicles) != 0:
            self.threads += trf.start_vehicle_list(self.world, self.npc_vehicles, 'NPC', visible = 1, behaviour = traffic_profile, disable_trf = traffic_lights_val) 
        
        #START EGO VEHICLE SPAWNED AT self.spawns[self.spawn_index] self.ego_vehicle
        self.threads.append( trf.start_vehicle(self.world, self.ego_vehicle, 'EGO_VEHICLE', visible = 2, behaviour = spawn_profile, disable_trf = traffic_lights_val) )
        #EGO VEHICLE SENSORS
        end_route, dist_check, obs_dist, col_check, max_speed = sen.attach_sensors(self.world, self.ego_vehicle[0], 30, obst_sen_val) #last two: max_dist, tol_dist

        print(f"Sim results, End_route: {end_route}, distance_respected: {dist_check}, obstacle_dist: {obs_dist}, has_collided: {col_check}, max_speed: {max_speed}")
        
        self.npc_vehicles = []
        self.ego_vehicle = None #(vehicle, wp)
        self.current_spawned = None
        self.ego_spawn_location = None

# START INTERFACE
root = tk.Tk()
app = TrafficSimulatorGUI(root, spacing=7) #def 7
root.mainloop()


Junctions found: 9

Nessun punto di spawn trovato per Waypoint(Transform(Location(x=-67.118195, y=-61.693558, z=0.000000), Rotation(pitch=0.000000, yaw=-359.403259, roll=0.000000)))
Nessun punto di spawn trovato per Waypoint(Transform(Location(x=-67.154648, y=-58.193748, z=0.000000), Rotation(pitch=0.000000, yaw=-359.403259, roll=0.000000)))
Spawned vehicle at: Transform(Location(x=-45.235935, y=-36.500095, z=0.600000), Rotation(pitch=0.000000, yaw=-89.567680, roll=0.000000))
Spawned Actor(id=609, type=vehicle.lincoln.mkz_2020)
Actor(id=609, type=vehicle.lincoln.mkz_2020)
Spawned vehicle at: Transform(Location(x=-45.235935, y=-36.500095, z=0.600000), Rotation(pitch=0.000000, yaw=-89.567680, roll=0.000000))
Spawned vehicle at: Transform(Location(x=-18.782679, y=-64.708092, z=0.900005), Rotation(pitch=0.000000, yaw=-179.403244, roll=0.000000))
Spawn agent profile Selected profile for EGO: Aggressive
Traffic profile Selected profile for Traffic: Normal
Stampo destinazione: Transform(Locat