In [2]:
import os
import sys
import json
import subprocess
import shutil
import random

if 'SUMO_HOME' in os.environ:
    sys.path.append(os.path.join(os.environ['SUMO_HOME'], 'tools'))
import sumolib
import importlib
from pprint import pprint

# Add the "scripts" directory to sys.path
current_dir = os.path.dirname(os.path.abspath("__file__"))
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))
scripts_dir = os.path.join(parent_dir, 'scripts')
sys.path.append(scripts_dir)
import vehParameters
import GoogleMapsGeocoding as gmaps
import LLAMAconnect

In [3]:
net_path = 'osm.net.xml'
add_path = 'osm_stops.add.xml'
net = sumolib.net.readNet(net_path)
parkingAreas = list(sumolib.output.parse(add_path, "parkingArea"))

In [4]:
radius = 150

In [5]:
def flush_print():
    sys.stdout.write("\r")  # Move the cursor to the beginning of the line
    sys.stdout.write(" " * 50)  # Overwrite with spaces to clear the line
    sys.stdout.write("\r")  # Move back to the beginning again

In [6]:
def has_parking_spot(lanes, parkingAreas):
    # Example of parkingArea:
    # <parkingArea id="pa-1046248579#0" lane="-1046248579#0_0" roadsideCapacity="94" length="5.00"/>
    # Returns parkingArea id if there is a parking spot in the lane
    lane_ids = [lane.getID() for lane in lanes]
    for park in parkingAreas:
        if park.lane in lane_ids:
            return park.id
    return False

In [None]:
def getClosestEdges(lat, lon, radius, maxEdges=10):
    # Gets the 10 closest edges to the given lat, lon
    x, y = net.convertLonLat2XY(lon, lat)
    edges = net.getNeighboringEdges(x, y, radius)
    closestEdges = []
    if (len(edges) > 0):
        distanceAndEdges = sorted([(dist, edge) for edge, dist in edges], key=lambda x:x[0])

        ## Checking if the edge found can be used by passenger car
        for dist, edge in distanceAndEdges:
            if edge.allows('passenger'):
                closestEdges.append(edge)

    if len(edges) == 0:
        print(f'No edges found for {lat}, {lon}. Perhaps location is not inside the network, maybe decrease the raius?')
        return None
    
    return closestEdges

In [31]:
def getParkingSpot(lat, lon, radius, parkingAreas):
    # Get the parking spot closest to the given lat, lon
    # Used to set stops for the vehicles

    edges = getClosestEdges(lat, lon, radius)
    # Look for parking spots
    for i in range(len(edges)):
        parking_spot = has_parking_spot(edges[i].getLanes(), parkingAreas)
        if parking_spot:
            return parking_spot
    print(f"No parking spot found close to {lat}, {lon}. Perhaps decrease the radius?")
    return None

In [29]:
def getPath(location_time_list, parkingAreas, steps_per_stop = 10):
    # All that is needed to create the trip are the stops (parking areas) and the start and end edges.
    # The duarouter is responsible for finding the path between the edges going through the stops.
    # Here, we get the edges and stops that are going to be sent to LLAMA to create the trip.

    # 'coordinates' is a list of tuples with the latitude and longitude of the points of interest, for example IC, FEEC, IC means that
    # the vehicle will start from IC, stop at a parking lot close to FEEC, and then back to IC.
    # The first and last coordinates should be edges and the others should be parking spots.
    # `steps_per_stop` is the number of simulation steps that the vehicle will stay at each stop.

    # Departure for 7 is 0, 8 is 100, 9 is 200 and so on
    stop_durations = []
    departures = list(location_time_list.keys())
    stop_durations.append(-1) # Indicates this is an edge and not a parking spot

    path = []
    locations = list(location_time_list.values())
    home = getClosestEdges(*locations[0], radius)[0].getID()
    path.append(home)
    
    for i in range(1, len(locations)-1):
        stop_durations.append(steps_per_stop * (departures[i] - departures[i - 1]))
        ps = getParkingSpot(*locations[i], radius, parkingAreas)
        if ps is not None:
            path.append(ps)
        else:
            print(f"Could not find parking spot for {locations[i]}")
            raise Exception(f"Could not find parking spot for {locations[i]}")

    path.append(home)
    stop_durations.append(-1)
    
    return path, stop_durations

In [10]:
def pathToXML(path, vehicleID, veh_type, departure_time, stop_durations):
    # Converts the path to the XML format that LLAMA understands
    xml = f'<trip id="{vehicleID}" type="{veh_type}" depart="{departure_time}" from="{path[0]}" to="{path[-1]}">\n'
    for i in range(1, len(path)-1):
        xml += f'\t<stop parkingArea="{path[i]}" duration="{stop_durations[i]}"/>\n'
    xml += '</trip>'
    return xml

Getting the routine for the students described

In [12]:
# It is important not to use short names of institutes, like IB for institute of biology, because the LLM will not be able to assign the students to the correct institute.
places ={
    'leisure': ["bar", "movie_theater", "shopping_mall"],
    'eating': ["restaurant", "bakery", "cafe"],
    'shopping': ["store", "drugstore", "supermarket"],
    'sports': ["gym"],
    'institute': ["Institute of Biology", "Institute of Computing", "Institute of Geociences", "Institute of Philosophy and Human Sciences", "Institute of Chemistry"],
    'university': ["University of Campinas"],
}

student_info = "The student is geography student. Today, the student is going to have lunch at a restaurant and the student is going to the supermarket."
importlib.reload(LLAMAconnect)
responses = LLAMAconnect.generate_response_trips(student_info, places, number_of_trips=5) # number of trips defaults to 5
pprint(json.loads(responses[0]))

  0%|          | 0/5 [00:00<?, ?it/s]

Error: The response is missing some hours
Invalid response. Generating a new one.
Error: The response is missing some hours
Invalid response. Generating a new one.
Error: Invalid location generated 'Institute of Geography'
Invalid response. Generating a new one.
Error: Invalid location generated 'Institute of Geography and Human Sciences'
Invalid response. Generating a new one.


100%|██████████| 5/5 [00:04<00:00,  1.03it/s]

{'10': {'activity': 'study', 'location': 'Institute of Geociences'},
 '11': {'activity': 'study', 'location': 'Institute of Geociences'},
 '12': {'activity': 'lunch', 'location': 'restaurant'},
 '13': {'activity': 'leisure', 'location': 'restaurant'},
 '14': {'activity': 'shopping', 'location': 'supermarket'},
 '15': {'activity': 'shopping', 'location': 'supermarket'},
 '16': {'activity': 'sports', 'location': 'gym'},
 '17': {'activity': 'break', 'location': 'cafe'},
 '18': {'activity': 'dinner', 'location': 'restaurant'},
 '19': {'activity': 'study', 'location': 'home'},
 '20': {'activity': 'study', 'location': 'Institute of Geociences'},
 '21': {'activity': 'leisure', 'location': 'home'},
 '22': {'activity': 'leisure', 'location': 'home'},
 '23': {'activity': 'sleep', 'location': 'home'},
 '7': {'activity': 'wake up', 'location': 'home'},
 '8': {'activity': 'study', 'location': 'Institute of Geociences'},
 '9': {'activity': 'study', 'location': 'Institute of Geociences'}}





Generating different houses for students based on a random number generator in the where students might live

In [32]:
def getHome():
    # top right -22.815539, -47.076272
    # top left: -22.818517, -47.079520
    # bottom left: -22.826410, -47.072591
    # bottom right: -22.823439, -47.068828
    X_MIN = 22815539
    X_MAX = 22826410
    Y_MIN = 47068828
    Y_MAX = 47079520
    while True:
        random_x = random.randint(X_MIN, X_MAX)
        random_y = random.randint(Y_MIN, Y_MAX)
        lat = - random_x / 10**6
        lon = - random_y / 10**6
        if (lat, lon) in coords.values():
            continue
        break
    return lat, lon

In [33]:
def getCoords(trip, sulfixo, institutes, start_radius, step_radius, limit_radius, n_options = 3):
    # Returns a dictionary with the latitude and longitude of the locations of interest
    # The suffix is the name of the state, city and neighborhood that will be added to the end of each location to improve the search
    # 'start_radius' is the initial radius of the search, 'step_radius' is the amount that will be added to the radius if the location is not found and 'limit_radius' is the maximum radius that will be used. After that, the student will choose not to leave the place he is at.
    # 'n_options' is the number of options of places we ideally want to find to choose from. This only applies while the limit_radius is not exceeded
    coords = {}
    place = {}
    importlib.reload(gmaps)
    for i in range(len(trip)):
        local = trip[f'{i + 7}']['location']
        local_comp = local + ", " + sulfixo

        if local in coords.keys():
            continue
        
        if local == 'home':
            coords['home'] = (-22.821026, -47.072902)
            continue
        elif local in institutes:
                result = gmaps.geocode_address(local_comp)
                lat, lon = result[0]['latitude'], result[0]['longitude']
        else:
            found = True
            print(f"Looking for {local}...", end='', flush=True)

            # Stoting the previous location to use as a reference for the next search
            previous = coords[trip[f'{i + 6}']['location']]
            result = gmaps.find_nearby_building(previous[0], previous[1], local, radius=start_radius)
            expanded = radius

            # If the location is not found or there are less optios than expected, expand the search radius
            while result is False or len(result) < n_options:
                expanded += step_radius
                if expanded > limit_radius:
                    found = False
                    break
                
                result = gmaps.find_nearby_building(previous[0], previous[1], local, radius=expanded)

            if found == False:
                result = [{'latitude': previous[0], 'longitude': previous[1]}]
                place[f'{local}'] = trip[f'{i + 6}']['location']
                flush_print()
                print(f"Could not find {local} in a radius of {limit_radius} meters. The student will not leave the place he is currently at.")
            else:
                place[f'{local}'] = result[0]['name']
                random.shuffle(result) # Randomize the results to avoid always getting the absolute closest building
                lat, lon = result[0]['latitude'], result[0]['longitude']
                flush_print()
                print(f"Found {len(result)} options for {local}: {[x['name'] for x in result]}")
                print(f"\033[1m{local} picked: {result[0]['name']} at {lat}, {lon}.\033[0m")

        
        coords[f'{local}'] = (lat, lon)
        
    return coords, place

In [34]:
def coordsToTrip(trip, coords):
    location_time = {}
    home = coords['home'] # Gets a random home location for each student
    last = home
    location_time[7] = last
    
    for j in range(1, len(trip)):
        local_coords = coords[trip[f'{j + 7}']['location']]
        if local_coords != last:
            location_time[j + 7] = local_coords
            last = local_coords

    return location_time

Getting the latitude and longitude for each of the locations

In [21]:
sufixo = "Barão Geraldo, Campinas, SP, Brazil"
trips = {}
location_time_list = []
for i in range(len(responses)):
    json_response = json.loads(responses[i])
    trips[i] = json_response
    print(f"Getting coords for trip {i + 1}:")
    coords, names = getCoords(trips[i], sufixo, places['institute'], start_radius=500, step_radius=200, limit_radius=1000)
    location_time_list.append(coordsToTrip(trips[i], coords))
    print("")
print("All trips have been successfully generated.")


Getting coords for trip 1:
Found 14 options for restaurant: ['Mahab esfiharia', 'CPS Culinária Urbana', 'LUI Taqueria', 'Feirinha da Unicamp', 'Pé de fruta - Romã', 'Cantina do Bello', 'Redi lanchonete', 'University Restaurant - Unicamp', 'Restaurante da Saturnino', 'Aulus Bar & Restaurant', 'Pizza Deck', 'Lolla Empanadas - Barão Geraldo', 'Chalé Sonho Meu', 'Restaurante da Casa do Professor Visitante']
[1mrestaurant picked: Mahab esfiharia at -22.8189956, -47.0730723.[0m
Could not find supermarket in a radius of 1000 meters. The student will not leave the place he is currently at.
Found 7 options for gym: ['Golden Fitness Academy', 'KUMARA yoga e terapia', 'Retz Muay Thai - Barão Geraldo', 'Ateliê Soma - Pilates & Fisioterapia Integrativa', '[FT30 Online] | Método de Treinamento Funcional', 'RODA OM - CAMINHOS HOLÍSTICOS E MULTIDIMENSIONAIS', 'Academia Cinética Fitness']
[1mgym picked: Golden Fitness Academy at -22.82363470000001, -47.0780421.[0m
Found 6 options for cafe: ['Maria 

Taking the locations and turning them into a XML trips, which are written to PATHGEN.trips.xml

In [35]:
randomtrips_path = "/usr/share/sumo/tools/randomTrips.py"
def randomtrips_getArgs(net_path, additional_path, v_class, n_trips):
    args = [
        "-n", net_path,
        "-o", "randtrips.trips.xml",
        "-r", "randtrips.rou.xml",
        "--additional", additional_path,
        "--vclass", v_class,
        "--vehicle-class", v_class,
        "-e", str(n_trips),
        "--validate"
    ]
    return args

In [36]:
def getRandomTrips(net_path, add_path, vclass, n_trips):
    # Run the randomTrips.py to create the trips
    subprocess.run(['python3', randomtrips_path] + randomtrips_getArgs(net_path, add_path, vclass, n_trips), check=True)

In [37]:
def parseTripXML(location_time_list, parkingAreas, departure_times, styles, veh_types_per_student, n_vtypes, steps_per_stop = 10, rand_trips=None, out_file_name='finaltrips.rou.xml'):
    # Creates the XML for the trips
    # 'departure_times' is a list with the departure times for each trip
    # 'styles' is a list with the styles of the vehicles. The current supported styles are "agg" for aggressive and "norm" for normal
    # 'veh_types_per_student' is a list with the vehicle types for each student
    # 'n_vtypes' is the number of vTypes created for each style
    # 'rand_trips' is a file containing the random trips
    
    importlib.reload(vehParameters)

    xml = '<routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/routes_file.xsd">\n'

    xml += '\n<!-- BEGIN - LLM Generated trips -->\n'

    xml += '\n<!-- Vehicles -->\n'
    vehdists = vehParameters.generateVehicleTypes(styles, n_vtypes)
    xml += vehParameters.parseVehiclesXML(vehdists, styles)

    xml += '\n'
    xml += '<!-- Trips -->\n'
    for i in range(len(location_time_list)):
        path, stop_durations = getPath(location_time_list[i], parkingAreas, steps_per_stop=steps_per_stop)
        xml += pathToXML(path, f'veh{i + 1}', veh_types_per_student[i], departure_times[i], stop_durations) + '\n'

    xml += '\n\n <!-- END - LLM Generated trips -->\n\n'

    xml += '<!-- BEGIN - Random Trips -->\n\n'

    if rand_trips:
        start_read = False
        with open(rand_trips, 'r') as f:
            for line in f:
                test = line.strip()
                if test.startswith('<vType'):
                    start_read = True
                if start_read:
                    if line.startswith('</routes>'):
                        break
                    xml += line

    xml += '\n</routes>'
    
    with open(out_file_name, 'w') as f:
        f.write(xml)
    
    return xml

In [38]:
departure_times = []
for i in range(len(location_time_list)):
    departure_times.append((list(location_time_list[i].keys())[0] - 7) * 10)

styles = ["agg", "norm"] # Currently only two styles, but can be expanded
n_vtypes = 5 
veh_style_per_student = []
for i in range(len(location_time_list)):
    if i < len(location_time_list) / 2:
        veh_style_per_student.append('agg')
    else: 
        veh_style_per_student.append('norm')   

# IMPORTANT: If the desire is to generate alternative routes for the random trips, rand_trips must be set to randtrips.trips.xml, otherwise, the random_trips will be merged with the alternative routes for the LLM generated trips.
# If rand_trips is set to None, the PATHGEN.rou.alt.xml file should be used in osm.sumocfg. Otherwise, the merged.rou.alt.xml file should be used.
rand_trips = None
final_trips_file_name = "finaltrips.rou.xml"

getRandomTrips(net_path, add_path, "passenger", 10) # Generating random trips to randomtrips.rou.xml
parseTripXML(location_time_list, parkingAreas, departure_times, styles, veh_style_per_student, n_vtypes, steps_per_stop=10, rand_trips=rand_trips, out_file_name=final_trips_file_name) # Parsing to PATHGEN.trips.xml

Success.
Success.


'<routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/routes_file.xsd">\n\n<!-- BEGIN - LLM Generated trips -->\n\n<!-- Vehicles -->\n<vTypeDistribution id="agg">\n\t<vType id="v_agg0" minGap="0.2" accel="3.78" decel="2.96" startupDelay="0.11" sigma="0.65" tau="1.41" maxSpeed="117.92" speedFactor="2.04" lcStrategic="6.67" lcCooperative="0.09" lcSpeedGain="9.09" lcKeepRight="5.45" lcOvertakeRight="0.61" lcSpeedGainLookahead="3.23" lcOvertakeDeltaSpeedFactor="0.9" lcPushy="0.63" lcAssertive="2.79" lcImpatience="0.74" lcTimeToImpatience="7.69" lcLaneDiscipline="2.15" lcSigma="0.79" lcAccelLat="1.33" probability="0.055171419695758506">\n\t\t<param key="device.rerouting.probability" value="1.0"/>\n\t\t<param key="device.rerouting.adaptation-steps" value="18"/>\n\t\t<param key="device.rerouting.adaptation-interval" value="10"/>\n\t</vType>\n\t<vType id="v_agg1" minGap="0.15" accel="4.4" decel="3.62" startupDelay="0.31" sigma="0.8

Getting alternative routes for every route found

In [39]:
## Arguments to get alternative routes
duaiterate_path = "/usr/share/sumo/tools/assign/duaIterate.py"
def duaiterate_getArgs(net_path, trips_path, additional_path, iterations):
    args = [
        "-n", net_path,
        "-t", trips_path,
        "--additional", additional_path,
        "duarouter--additional-files", additional_path,
        "-l", str(iterations),
    ]
    return args

In [40]:
def getAltRoutes(net_path, trips_path, additional_path, iterations):
    # Run the duarouter to find alternative routes
    # Currently this only works for less than 10 iterations because of file naming
    for file in os.listdir('.'):
        if file.startswith('PATHGEN_') and file.endswith('.rou.alt.xml'):
            os.remove(file)
    
    try:
        subprocess.run(['python3', duaiterate_path] + duaiterate_getArgs(net_path, trips_path, additional_path, iterations), check=True)
        shutil.move(f'00{iterations-1}/finaltrips_00{iterations-1}.rou.alt.xml', f'finaltrips_00{iterations-1}.rou.alt.xml')
        for i in range(iterations):
            if os.path.exists(f'00{i}'):
                shutil.rmtree(f'00{i}')
                
        for file in os.listdir('.'):
            if file.startswith('finaltrips_') and file.endswith('.rou.alt.xml'):
                os.rename(file, f'finaltrips.rou.xml')
    except subprocess.CalledProcessError as e:
        print(f"{e}")

In [41]:
getAltRoutes(net_path, 'finaltrips.rou.xml', add_path, 3)

> Executing step 0
>> Running router on finaltrips.rou.xml
>>> Begin time: 2024-11-29 15:29:37.827446
>>> End time: 2024-11-29 15:29:38.362792
>>> Duration: 0:00:00.535346
<<
>> Running simulation
>>> Begin time: 2024-11-29 15:29:38.362900
>>> End time: 2024-11-29 15:29:39.477525
>>> Duration: 0:00:01.114625
<<
< Step 0 ended (duration: 0:00:01.650212)
------------------

> Executing step 1
>> Running router on 000/finaltrips_000.rou.alt.xml
>>> Begin time: 2024-11-29 15:29:39.484580
>>> End time: 2024-11-29 15:29:40.020092
>>> Duration: 0:00:00.535512
<<
>> Running simulation
>>> Begin time: 2024-11-29 15:29:40.020147
>>> End time: 2024-11-29 15:29:41.089629
>>> Duration: 0:00:01.069482
<<
< Step 1 ended (duration: 0:00:01.605209)
------------------

> Executing step 2
>> Running router on 001/finaltrips_001.rou.alt.xml
>>> Begin time: 2024-11-29 15:29:41.104284
>>> End time: 2024-11-29 15:29:41.644629
>>> Duration: 0:00:00.540345
<<
>> Running simulation
>>> Begin time: 2024-11-29 15

To use this file, just set PATHGEN.rou.alt.xml as a trips file in osm.sumocfg

If there is no need to generate alternative router for the random trips, the following XML parser must be used to add them:

In [42]:
def merge_routes(alternative_routes, random_routes):
    # Merges two route files into one
    
    xml = '<routes xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://sumo.dlr.de/xsd/routes_file.xsd">\n'

    xml += '\n<!-- BEGIN - LLM Generated trips -->\n'

    with open(alternative_routes, 'r') as f:
        start_read = False
        for line in f:
            test = line.strip()
            if test.startswith('<vType'):
                start_read = True
            if start_read:
                if line.startswith('</routes>'):
                    break
                xml += line

    xml += '\n\n <!-- END - LLM Generated trips -->\n\n'

    xml += '<!-- BEGIN - Random Trips -->\n\n'

    with open(random_routes, 'r') as f:
        start_read = False
        for line in f:
            test = line.strip()
            if test.startswith('<vType'):
                start_read = True
            if start_read:
                xml += line

    with open('finaltrips.rou.xml', 'w') as f:
        f.write(xml)
    
    return xml    

In [43]:
# If alternative routes for random trips were ignored, the 'merged.rou.alt.xml' file will be created and should be used in osm.sumocfg
if rand_trips == None:
    merge_routes('finaltrips.rou.xml', 'randtrips.rou.xml')