In [1]:
from nagel_schreckenberg_model import *
import numpy as np
import pandas as pd
import os
import plotly.graph_objects as go

In [2]:
def save_data_to_individual_csv(flux_no_traffic_light, flux_by_traffic_light_time, output_directory):
    # Ensure output directory exists
    if not os.path.exists(output_directory):
        os.makedirs(output_directory)
    
    # Prepare and save no traffic light data
    all_data_no_traffic = [{'Number of Cars': nc, 'Average Flux': flux, 'Configuration': 'No Traffic Lights'} 
                           for nc, flux_values in flux_no_traffic_light.items() 
                           for flux in flux_values]
    df_no_traffic = pd.DataFrame(all_data_no_traffic)
    df_no_traffic.to_csv(f"{output_directory}/no_traffic_light_data.csv", index=False)

    # Prepare and save data with traffic lights, grouped by position and timing
    position_timing_data = {}
    for nc, positions in flux_by_traffic_light_time.items():
        for pos, times in positions.items():
            for time, flux_values in times.items():
                key = (pos, time)
                if key not in position_timing_data:
                    position_timing_data[key] = []
                position_timing_data[key].extend([
                    {'Number of Cars': nc, 'Flux Marker Position': pos, 'Traffic Light Time': time, 'Average Flux': flux}
                    for flux in flux_values
                ])

    # Write each position and timing combination to a CSV file
    for (pos, time), data in position_timing_data.items():
        filename = f"traffic_data_pos{pos}_time{time}.csv"
        df_traffic = pd.DataFrame(data)
        df_traffic.to_csv(f"{output_directory}/{filename}", index=False)

In [3]:
def plot_from_csv(csv_files):
    fig = go.Figure()
    all_data = []

    # Pre-read all data to find the range for setting consistent axes
    for csv_file in csv_files:
        df = pd.read_csv(csv_file)
        all_data.append(df)

    # Compute maximum values and add a buffer for axis ranges
    max_cars = max(df['Number of Cars'].max() for df in all_data) * 1.1  # Adding 10% buffer
    max_flux = max(df['Average Flux'].max() for df in all_data) * 1.1  # Adding 10% buffer

    # Sort files to ensure no traffic light data comes first, then by position and time
    def sorting_key(filename):
        if "no_traffic_light_data" in filename:
            return (0, 0, 0)  # Always sort these first
        else:
            parts = filename.replace('traffic_data_', '').replace('.csv', '').split('_')
            pos_part = next((part for part in parts if 'pos' in part), 'pos0')
            time_part = next((part for part in parts if 'time' in part), 'time0')
            pos = int(pos_part.replace('pos', '')) if pos_part.replace('pos', '').isdigit() else 0
            time = int(time_part.replace('time', '')) if time_part.replace('time', '').isdigit() else 0
            return (1, pos, time)  # Sort by position and time after all no traffic light files

    csv_files = sorted(csv_files, key=sorting_key)

    for csv_file in csv_files:
        df = pd.read_csv(csv_file)
        filename = csv_file.split('/')[-1]

        if "no_traffic_light_data" in filename:
            label = "No Traffic Lights"
        else:
            parts = filename.replace('traffic_data_', '').replace('.csv', '').split('_')
            pos_part = next((part for part in parts if 'pos' in part), 'pos0')
            time_part = next((part for part in parts if 'time' in part), 'time0')
            pos = pos_part.replace('pos', '')
            time = time_part.replace('time', '')
            if pos == "None":
                pos = "Opposite extreme"
            label = f"FMP: {pos} | TLT: {time}"

        fig.add_trace(go.Scatter(x=df['Number of Cars'], y=df['Average Flux'],
                                 mode='markers', name=label,
                                 marker=dict(opacity=0.1), visible="legendonly"))  # Start as unselected in legend

    # Add toggle all functionality
    fig.update_layout(
        xaxis_title="Number of Cars",
        yaxis_title="Average Flux",
        legend_title="Configuration",
        title="Interactive Traffic Simulation Analysis",
        xaxis=dict(range=[0, max_cars]),
        yaxis=dict(range=[0, max_flux]),
        updatemenus=[
            dict(
                type="buttons",
                direction="right",
                x=0.7,
                xanchor="left",
                y=1.22,
                yanchor="top",
                buttons=[
                    dict(
                        label="Toggle All On",
                        method="update",
                        args=[{"visible": [True] * len(fig.data)}]),
                    dict(
                        label="Toggle All Off",
                        method="update",
                        args=[{"visible": ["legendonly"] * len(fig.data)}])
                ]
            )
        ]
    )
    fig.show()

In [4]:
def plot_all_csv_in_folder(folder_path):
    # List all CSV files in the specified folder
    csv_files = [os.path.join(folder_path, file) for file in os.listdir(folder_path) if file.endswith('.csv')]
    
    # Call the plot function with the list of CSV files
    plot_from_csv(csv_files)

In [5]:
road_size = 200
num_cars_range = np.arange(1, 200)
max_speed = 5
brake_prob = 0.1
iterations = 800
simulations_per_num_cars = 3
flux_marker_positions = [None, road_size-1, int((int((road_size-1)/2)+(road_size-1))/2)]
traffic_light_position = [None, 0]
traffic_light_time = [5, 25, 50, 100, 400]
flux_by_traffic_light_time = {}

In [6]:
flux_no_traffic_light = {nc: np.array([]) for nc in num_cars_range}
flux_by_traffic_light_time = {nc: {fmp: {tlt: np.array([]) for tlt in traffic_light_time} for fmp in flux_marker_positions} for nc in num_cars_range}

In [7]:
for tlp in traffic_light_position:
        if tlp != 0:
            for nc in num_cars_range:
                for s in range(simulations_per_num_cars):
                    r = Road(road_size, nc, max_speed, brake_prob, iterations)
                    r.run(terminal_printing=False)
                    flux_no_traffic_light[nc] = np.append(flux_no_traffic_light[nc], r.calculate_average_flux())
        else:
            for nc in num_cars_range:
                for fmp in flux_marker_positions:
                    for tlt in traffic_light_time:
                        for s in range(simulations_per_num_cars):
                            r = Road(road_size, nc, max_speed, brake_prob, iterations, fmp, traffic_light_position[1], tlt)
                            r.run(terminal_printing=False)
                            flux_by_traffic_light_time[nc][fmp][tlt] = np.append(flux_by_traffic_light_time[nc][fmp][tlt], r.calculate_average_flux())

In [8]:
save_data_to_individual_csv(flux_no_traffic_light, flux_by_traffic_light_time, './results')

In [9]:
plot_all_csv_in_folder('./results')