In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import mannwhitneyu
from IPython.display import display
import zipfile
from collections import defaultdict

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

pd.set_option('display.max_rows', None)

In [2]:
# simulation output folder
experiment_name = 'test_policy_run_1'

# load Java simulation data
# experiment_folder = fr'F:\jesse_sim_results\{experiment_name}'
experiment_folder = fr'..\data\{experiment_name}'

# available files
input_values = 'inputValues.csv'
intermediate_output = 'intermediateOutputData.csv'
single_output = 'singleOutputData.csv'
sequence_output = 'sequenceOutputData.csv'
lane_change_output = 'laneChangeOutputData.csv'
collision_output = 'collisionOutputData.csv'

In [3]:
# function to get a list of all file/folder names within a folder
def get_file_names(path):
    return os.listdir(path)

# load available seed folders
seed_folders = get_file_names(experiment_folder)

In [4]:
# function to retrieve any data within the project folder as dataframe
def load_scenario_dataframe(scenario, columns_of_interest, folder, file, input_file):
    # create empty DataFrame
    df = pd.DataFrame()
    
    # loop through all seed folders from this experiment
    for seed_name in get_file_names(folder):
        #############
        # stop at seed 2
        if seed_name == 'seed_2':
            break
        ###############
        
        # get all runs within this experiment
        run_folders = get_file_names(os.path.join(folder, seed_name))

        # go through all runs
        for run_folder in run_folders:
            run_number = int(run_folder.split('_')[1])
            data_folder = os.path.join(folder, seed_name, run_folder)

            # initialise scenario boolean
            row_matches_scenario = False

            # open zip file (remove .csv form file name)
            # check whether input file matches this scenario
            # open input zip file
            with zipfile.ZipFile(os.path.join(data_folder, fr'{input_file[:-4]}.zip'), 'r') as zip_ref:
                # read input csv
                with zip_ref.open(input_file) as input_data_file:
                    df_input = pd.read_csv(input_data_file)
                    row_matches_scenario = all(df_input.iloc[0][key] == value for key, value in scenario.items())
                    
            # get data if input values match this scenario
            if row_matches_scenario:
                with zipfile.ZipFile(os.path.join(data_folder, fr'{file[:-4]}.zip'), 'r') as zip_ref:
                    # read data csv
                    with zip_ref.open(file) as data_file:
                        df_run = pd.read_csv(data_file)
                        df_interest = df_run[columns_of_interest].copy()
                        df_interest['run'] = run_number

                        # broadcast input data to all rows in df_interest
                        for col in df_input.columns:
                            df_interest[col] = df_input[col].iloc[0]
                    
                        # add this data to the main DataFrame
                        df = pd.concat([df, df_interest])
    
    # return resulting DataFrame
    df.reset_index(drop=True, inplace=True)
    return df


# function to retrieve any data within the project folder as dataframe
def load_scenario_collision_dataframe(scenario, folder):
    # collision file
    file = collision_output
    input_file = input_values
    
    # create empty DataFrame
    df = pd.DataFrame()
    
    # loop through all seed folders from this experiment
    for seed_name in get_file_names(folder):
        
        # get all runs within this experiment
        run_folders = get_file_names(os.path.join(folder, seed_name))

        # go through all runs
        for run_folder in run_folders:
            run_number = int(run_folder.split('_')[1])
            data_folder = os.path.join(folder, seed_name, run_folder)

            # initialise scenario boolean
            row_matches_scenario = False

            # open zip file (remove .csv form file name)
            # check whether input file matches this scenario
            # open input zip file
            with zipfile.ZipFile(os.path.join(data_folder, fr'{input_file[:-4]}.zip'), 'r') as zip_ref:
                # read input csv
                with zip_ref.open(input_file) as input_data_file:
                    df_input = pd.read_csv(input_data_file)
                    row_matches_scenario = all(df_input.iloc[0][key] == value for key, value in scenario.items())
                    
            # get data if input values match this scenario
            if row_matches_scenario:
                with zipfile.ZipFile(os.path.join(data_folder, fr'{file[:-4]}.zip'), 'r') as zip_ref:
                    # read data csv
                    with zip_ref.open(file) as data_file:
                        # try to read collision data (will fail when empty)
                        try:
                            df_collisions = pd.read_csv(data_file)
                        # do not process empty collision dataframes
                        except:
                            continue
                        # do process collision  dataframe if it has data
                        print(f'Collisions found in {seed_name}-{run_number}')
                        df_collisions['run'] = run_number

                        # broadcast input data to all rows of collision dataframe
                        for col in df_input.columns:
                            df_collisions[col] = df_input[col].iloc[0]
                    
                        # add this data to the main DataFrame
                        df = pd.concat([df, df_collisions])
    
    # return resulting DataFrame
    df.reset_index(drop=True, inplace=True)
    return df

In [5]:
# define scenario 0
scenario_0 = {
    'level0_fraction': 1.0,
    'level1_fraction': 0.0,
    'level2_fraction': 0.0,
    'level3_fraction': 0.0,
    'in_vehicle_distraction': True,
    'road_side_distraction': False
}

In [None]:
headway_variables = ['time', 'gtu_id', 'gtu_type', 'x_position', 'headway_time', 'headway_time_old', 'headway_distance', 'leader_gtu_id', 'leader_gtu_type', 'lane', 'link']
df_gtu_headway_0 = load_scenario_dataframe(scenario_0, headway_variables, experiment_folder, sequence_output, input_values)

In [None]:
def show_headway_time_boxplot(df_list, labels, column):
    # get headway time data for each vehicle type
    headway_data = [df[column][df[column] < 4].dropna().tolist() for df in df_list]

    # create plot
    plt.figure(figsize=(10, 6))
    box = plt.boxplot(headway_data, labels=labels, showfliers=True, showmeans=True, patch_artist=True)

    # set colors
    colors = ['lightblue', 'lightgreen', 'lightcoral', 'lightgray', 'lightyellow']
    for patch, color in zip(box['boxes'], colors):
        patch.set(facecolor=color)

    # show plot
    plt.title("Boxplot for headway time per automation level")
    plt.xlabel("Vehicle Type")
    plt.ylabel("Headway time (s)")
    plt.xticks(rotation=45)
    plt.show()

In [None]:
show_headway_time_boxplot([df_gtu_headway_0], ['Scenario 0'], 'headway_time')

In [None]:
show_headway_time_boxplot([df_gtu_headway_0], ['Scenario 0'], 'headway_time_old')

Are there low or negative headway times (new) after collisions are removed?

In [None]:
print('Low headway time:', len(df_gtu_headway_0[(df_gtu_headway_0['headway_time'] > 0) & (df_gtu_headway_0['headway_time'] < 0.2)]))
print('Negative headway time:', len(df_gtu_headway_0[df_gtu_headway_0['headway_time'] < 0]))

In [None]:
df_collisions_0 = load_scenario_collision_dataframe(scenario_0, experiment_folder)

In [None]:
def remove_collided_gtus(df, df_coll):
    # Create collision dictionary
    collision_dict = {}
    
    for _, row in df_coll.iterrows():
        # Use (seed, run) tuple as the dictionary key
        seed_run_key = (row['seed'], row['run'])
        
        # Initialize set for seed-run combination if not already present
        if seed_run_key not in collision_dict:
            collision_dict[seed_run_key] = set()
        
        # Add GTUs to the set
        if pd.notna(row['id']):
            collision_dict[seed_run_key].add(row['id'])
        if pd.notna(row['leader_id']):
            collision_dict[seed_run_key].add(row['leader_id'])

    # Function to determine whether GTU was involved in a collision
    def is_involved_in_collision(row):
        # Use (seed, run) tuple as key
        seed_run_key = (row['seed'], row['run'])
        return seed_run_key in collision_dict and row['gtu_id'] in collision_dict[seed_run_key]
    
    # Filter dataframe for collisions
    df_filtered = df[~df.apply(is_involved_in_collision, axis=1)]
    
    # Return new filtered dataframe
    return df_filtered

In [None]:
df_gtu_headway_0_filtered = remove_collided_gtus(df_gtu_headway_0, df_collisions_0)

In [None]:
print('Low headway time:', len(df_gtu_headway_0_filtered[(df_gtu_headway_0_filtered['headway_time'] > 0) & (df_gtu_headway_0_filtered['headway_time'] < 0.2)]))
print('Negative headway time:', len(df_gtu_headway_0_filtered[df_gtu_headway_0_filtered['headway_time'] < 0]))

Can we recreate headway_distance with the new headway_time and speed?

In [None]:
df_gtu_headway_0[['seed', 'run', 'time', 'gtu_id', 'gtu_type', 'speed', 'headway_time_old', 'headway_distance']].head(10)

In [None]:
df_new = df_gtu_headway_0.copy()
df_new['headway_distance_new'] = df_new['speed'] * df_new['headway_time']
df_gtu_headway_0[['seed', 'run', 'time', 'gtu_id', 'gtu_type', 'speed', 'headway_time', 'headway_distance_new']].head(10)