In [None]:
# Imports cell
import mercury as mr

from random import randint, seed
from dotenv import load_dotenv
import sys, pickle, shutil, os.path
from tud_sumo.simulation import Simulation
from tud_sumo.plot import Plotter, MultiPlotter
from datetime import datetime
from pathlib import Path
import pandas as pd
from lxml import etree


In [None]:
app = mr.App(title="TFM 2025/26 Assignment 1",description="CT3505-24: Assignment 1 2025/26",allow_download=False,allow_share=False)

# CT3505-24 - Traffic Flows and Management - 2025/26

## Assignment 1: Circular road network

## Using this interactive notebook
**Please refer to the course's Brightspace page. Find more information on this notebook in the README.md and INSTALL.md files.**

- Choose your vehicle and simulation parameters on the controls pane (<-)
- Click on (re-)Run simulation to let the notebook interact with the simulation backend (SUMO)
- Results are visualised below, under "Simulation results"
- If "Compare with previous simulation" is active when selecting "(re)-Run simulation", the latest simulation results will be kept and comparative KPIs will be shown for both that and the newly adapted inputs.
- If "Store simulation results" is selected, simulation results will be stored in the *./experiments* folder. Simulation results are stored in pickled Pandas dataframes. Results can therefore be inspected manually through Pandas, other than through this notebook.
- If "Show simulation" is selected, SUMO's graphical user interface will open when a simulation is executed.

In [None]:
#Graphical user interface


HV_minGap_slider = mr.Slider(value=0.25,min=0.05,max=1.25,label="Minimum Desired Gap [m]",step=0.05)
HV_accel_slider = mr.Slider(value=1.5,min=0.05,max=4,label="Vehicle Accelleration [m/s^2]",step=0.1)
HV_decel_slider = mr.Slider(value=2,min=0.05,max=4,label="Vehicle Deceleration [m/s^2]",step=0.05)
HV_sigma_slider = mr.Slider(value=0.5,min=0,max=1,label="Vehicle homogeneity [0 = perfectly homogeneous, 0.5 = typical traffic]",step=0.05)
HV_tau_slider = mr.Slider(value=1,min=0.05,max=5,label="Desired Minimum Time Headway [s]",step=0.05)
numveh_slider = mr.Slider(value=20,min=10,max=60,label="Total demand [veh]",step=1)
#BA_button = mr.Button(label='Update vehicle behaviour parameters.')

run_sim_button = mr.Button(label='(re-)Run simulation')


compare_sim_results_toggle = mr.Checkbox(value=False,label="Compare with previous simulation")
#use_cache_toggle = mr.Checkbox(value=False,label="Use cache")
#if not use_cache_toggle.value:
save_sim_toggle = mr.Checkbox(value=False,label="Store simulation results")
see_GUI_toggle = mr.Checkbox(value=False,label="Show simulation")

#else:
    #compare_sim_results_toggle = mr.Checkbox(value=False,label="Compare with previous simulation",disabled=True)
    #save_sim_toggle = mr.Checkbox(value=False,label="Store simulation results",disabled=True)
    #see_GUI_toggle = mr.Checkbox(value=False,label="Show simulation",disabled=True)

    



In [None]:
# Convenience functions
import re
def readSimDescription(filename=""):
    description=""
    if filename != "" and filename.split('.')[-1]=='txt' and os.path.exists(filename): #some nice checking that this is a text file with desired outputs
        raw_data=""
        with open(filename,'r') as f:
            raw_data = f.read().replace('\n', '')
        try: #this bit is try catch'd, if it fails description remains = ""
            raw_data=raw_data.split("Description:")[1]
            raw_data=raw_data.split("*")[0]
            raw_data=re.sub(r'\s{2,}', ' ', raw_data)    
            raw_data=raw_data.replace('|',' ')
            description=raw_data
        except Exception:
            pass
                

    return description

In [None]:
#Simulation backend
simComplete=False
savesDir=Path(".").resolve().joinpath("experiments")
cacheDir=Path(".").resolve().joinpath("cache")
old_sim = None

def updateVehicleClasses(HV_user_values_dict=dict(),HV_number=1,rouFile='./Network/circle.rou.xml'): #collects user input for Behavioural Adaptation and updates the corresponding classes in the scenario's rou file
    HV_default_values_dict={'minGap':0.25,'accel':1.5,'decel':2,'tau':0.5,'sigma':0.5}
    tree=etree.parse(rouFile)
    root=tree.getroot()
    for element in root.iter("vType"):
        attribs=element.attrib
        if element.get("id")=="car":
            #Adapt parameters based on user input
            for k in HV_default_values_dict.keys():
                if k in HV_user_values_dict:
                    attribs[k]=str(HV_user_values_dict[k])
                else:
                    attribs[k]=str(HV_default_values_dict[k])
    for element in root.iter("flow"):
        attribs=element.attrib
        if element.get("id")=="v":
            attribs["number"]=str(HV_number) #Update number of vehicles in the flow

    tree.write(rouFile)


def runScenario(my_sim):

    # Start the simulation, defining the sumo config files. Add "-gui" to the command line to run with the GUI.
    my_sim.start("./Network/circle.sumocfg", get_individual_vehicle_data=False, gui=("-gui" in sys.argv) or forceGUI,
                 seed=sim_seed, units="metric", suppress_pbar=True) # Units can either be metric (km,kmph)/imperial (mi,mph)/UK (km,mph). All data collected is in these units.
    
    my_sim.add_tracked_edges(["B999B1000","B999B1000.327"])
    n, sim_dur, warmup = 1, 12*60 / my_sim.step_length, 0 / my_sim.step_length
    
   
    if warmup > 0:
        my_sim.step_through(n_steps=warmup, pbar_max_steps=sim_dur+warmup, keep_data=False)

    while my_sim.curr_step < sim_dur + warmup:

        
        
        # Step through n seconds.
        my_sim.step_through(n_seconds=n, pbar_max_steps=sim_dur+warmup)

       



    # End the simulation.
    my_sim.end()


forceGUI = see_GUI_toggle.value
scenario_name="circle_"+datetime.now().strftime("%Y-%m-%d %H:%M:%S")
scenario_savefile=str(savesDir.joinpath("circle_"+datetime.now().strftime("%Y-%m-%d %H%M%S")))




if run_sim_button.clicked:
    try:
        HV_user_values_dict={'minGap':HV_minGap_slider.value,'accel':HV_accel_slider.value,'decel':HV_decel_slider.value,'sigma':HV_sigma_slider.value,'tau':HV_tau_slider.value}
        HV_number=numveh_slider.value
        updateVehicleClasses(HV_user_values_dict=HV_user_values_dict,HV_number=HV_number)
    except NameError as e:
    #If the button isn't shown, skip this part entirely
        pass
    
    if not simComplete:
        print(f'Simulating scenario...')
        # Initialise the simulation object.
        scenario_desc = f"Circular roadway. "+datetime.now().strftime("%Y-%m-%d %H:%M:%S") #Procedurally generated scenario description for later readability
        my_sim = Simulation(scenario_name, scenario_desc=scenario_desc)
        # Add "-seed {x}" to the command line to set a seed for the simulation
        sim_seed = "1" if "-seed" not in sys.argv[:-1] else sys.argv[sys.argv.index("-seed")+1]
        seed(int(sim_seed))
        try:
            runScenario(my_sim)
        except Exception as e:
            print("Scenario computation failed. Error detail:", e)
            pass #Simply discard this simulation
        
        simComplete=True
        if save_sim_toggle.value: #boolean
            my_sim.save_data(scenario_savefile)
            
            
        #if compare_sim_results_toggle.value:
        my_sim.save_data('./experiments/curr_sim.pkl') #just do it anyway, cleaner.
        my_sim.print_summary('./experiments/curr_sim.txt')
        
        
    

# Simulation results

In [None]:
# Visualisation cells: show relevant sim results directly from my_sim object (skip saving)
if simComplete and not compare_sim_results_toggle.value:
    plt = Plotter(my_sim,time_unit="minutes")

elif simComplete and compare_sim_results_toggle.value:
    old_sim='./experiments/old_sim.pkl'
    my_sim_file='./experiments/curr_sim.pkl'
    #Print relevant simulation information here
    old_sim_desc='./experiments/old_sim.txt'
    my_sim_desc='./experiments/curr_sim.txt'
    print("Current simulation scenario:\n" + readSimDescription(my_sim_desc))
    print("Previous simulation scenario:\n" + readSimDescription(old_sim_desc))
    mplt = MultiPlotter(scenario_label=f"{readSimDescription(my_sim_desc)} [current] vs \n {readSimDescription(old_sim_desc)} [previous] \n",time_unit="minutes")
    mplt.add_simulations([my_sim_file, old_sim], labels=["Current scenario","Previous scenario"] )
    

## Trajectories

In [None]:
trackedEdges=["B999B1000","B999B1000.327"] #hardcoded for convenience, could allow for some selection with another input widget if I so please - but then needs a good label dictionary to avoid having these numerical IDs - and a good explanatory figure!

if simComplete and not compare_sim_results_toggle.value:
    plt.plot_trajectories(trackedEdges, lane_idx=0)
elif simComplete and compare_sim_results_toggle.value:
    # This needs to be done with two plt objects loaded and plotted manually one by one (oof.)
    plt = Plotter(my_sim_file,time_unit="minutes")
    plt.plot_trajectories(trackedEdges)

    plt2 = Plotter(old_sim,time_unit="minutes")
    plt2.plot_trajectories(trackedEdges)


## Space-time diagrams

In [None]:
trackedEdges=["B999B1000","B999B1000.327"] #hardcoded for convenience, could allow for some selection with another input widget if I so please - but then needs a good label dictionary to avoid having these numerical IDs - and a good explanatory figure!

if simComplete and not compare_sim_results_toggle.value:
    plt.plot_space_time_diagram(trackedEdges)
elif simComplete and compare_sim_results_toggle.value:
    # This needs to be done with two plt objects loaded and plotted manually one by one (oof.)
    plt = Plotter(my_sim_file,time_unit="minutes")
    plt.plot_space_time_diagram(trackedEdges)

    plt2 = Plotter(old_sim,time_unit="minutes")
    plt2.plot_space_time_diagram(trackedEdges)


## Road speeds

In [None]:
trackedEdges=["B999B1000","B999B1000.327"] #hardcoded for convenience, could allow for some selection with another input widget if I so please - but then needs a good label dictionary to avoid having these numerical IDs - and a good explanatory figure!

if simComplete and not compare_sim_results_toggle.value:
    for edge in trackedEdges:
        plt.plot_edge_data(edge,"speeds")
elif simComplete and compare_sim_results_toggle.value:
    # This needs to be done with two plt objects loaded and plotted manually one by one (oof.)
    plt = Plotter(my_sim_file,time_unit="minutes")
    for edge in trackedEdges:
        plt.plot_edge_data(edge,"speeds")

    plt2 = Plotter(old_sim,time_unit="minutes")
    for edge in trackedEdges:
        plt2.plot_edge_data(edge,"speeds")
    


## Road densities

In [None]:
trackedEdges=["B999B1000","B999B1000.327"] #hardcoded for convenience, could allow for some selection with another input widget if I so please - but then needs a good label dictionary to avoid having these numerical IDs - and a good explanatory figure!

if simComplete and not compare_sim_results_toggle.value:
    for edge in trackedEdges:
        plt.plot_edge_data(edge,"densities")
elif simComplete and compare_sim_results_toggle.value:
    # This needs to be done with two plt objects loaded and plotted manually one by one (oof.)
    plt = Plotter(my_sim_file,time_unit="minutes")
    for edge in trackedEdges:
        plt.plot_edge_data(edge,"densities")

    plt2 = Plotter(old_sim,time_unit="minutes")
    for edge in trackedEdges:
        plt2.plot_edge_data(edge,"densities")
    

## Road flows

In [None]:
trackedEdges=["B999B1000","B999B1000.327"] #hardcoded for convenience, could allow for some selection with another input widget if I so please - but then needs a good label dictionary to avoid having these numerical IDs - and a good explanatory figure!

if simComplete and not compare_sim_results_toggle.value:
    for edge in trackedEdges:
        plt.plot_edge_data(edge,"flows")
elif simComplete and compare_sim_results_toggle.value:
    # This needs to be done with two plt objects loaded and plotted manually one by one (oof.)
    plt = Plotter(my_sim_file,time_unit="minutes")
    for edge in trackedEdges:
        plt.plot_edge_data(edge,"flows")

    plt2 = Plotter(old_sim,time_unit="minutes")
    for edge in trackedEdges:
        plt2.plot_edge_data(edge,"flows")
    

## Total number of vehicles in the ring road

In [None]:
if simComplete and not compare_sim_results_toggle.value:
    plt.plot_vehicle_data("no_vehicles")
elif simComplete and compare_sim_results_toggle.value:
    # This needs to be done with two plt objects loaded and plotted manually one by one (oof.)
    plt = Plotter(my_sim_file,time_unit="minutes")
    plt.plot_vehicle_data("no_vehicles")

    plt2 = Plotter(old_sim,time_unit="minutes")
    plt2.plot_vehicle_data("no_vehicles")

In [None]:
#Finally, replace old with current simulation, where applicable
if simComplete:
    shutil.copy("./experiments/curr_sim.pkl","./experiments/old_sim.pkl")
    shutil.copy("./experiments/curr_sim.txt","./experiments/old_sim.txt")
