In [89]:
import os
input_dir = os.getcwd() + "/out/highway_env_continuous_actions/experiment/v2"

Marked

In [347]:
import ast
def _extract_config(lines):
    switch_config_target = "Switch configurations:"
    pctrl_only_target = "'--pctrl-only'"
    data = {}
    found_switch_config = False
    for _, line in enumerate(lines):
        if pctrl_only_target in line:
            pctrl_only_str = line.strip().replace("--", "").split(':')[1].strip()
            pctrl_only = ast.literal_eval(pctrl_only_str)
            data['pctrl_only'] = pctrl_only[0]
        elif switch_config_target in line:
            found_switch_config = True
        elif found_switch_config:
            # Safely evaluate the config string to a Python list containing a dictionary
            item_list = ast.literal_eval(line)
            if isinstance(item_list, list) and len(item_list) > 0 and isinstance(item_list[0], dict):
                record = item_list[0]
                ctrl_id = record.get('ctrl_id')
                thresh = record.get('thresh', {})
                time_delay = thresh.get('time_delay')
                cross_lane_headway = thresh.get('cross_lane_headway')
                data['ctrl_id'] = ctrl_id
                data['time_delay'] = time_delay
                data['cross_lane_headway'] = cross_lane_headway        
            found_switch_config = False  
    return data

def _extract_switch_probs(lines):
    _dict = {}
    for line in lines:
        if line.startswith("[probabilities]"):
            # Use regular expression to find all numbers in the line
            numbers_str = re.findall(r"[-+]?\d*\.\d+|\d+", line)
            if len(numbers_str) >= 2:
                try:
                    _dict["prob_to_pri_ctrl"] = float(numbers_str[0])
                    _dict["prob_to_sec_ctrl"] = float(numbers_str[0])
                except ValueError:
                    print(f"Warning: Could not convert first two elements to numbers in line: {line.strip()}")
    return _dict

def _extract_exit_probs(lines):
    _dict = {}
    for line in lines:
        if line.startswith("[counts]"):
            # Use regular expression to find all numbers in the line
            numbers_str = re.findall(r"[-+]?\d*\.\d+|\d+", line)
            if len(numbers_str) >= 3:
                try:
                    num1 = int(numbers_str[1])
                    num2 = int(numbers_str[2])
                    if num1 * num2 < 0 or num1 + num2 <= 0:
                        raise ValueError("Both counts must be non-negative and at least one count must be positive")
                    p = num1 / float(num1 + num2)
                    _dict["prob_to_exit_sec_ctrl"] = p
                    _dict["prob_to_stay_in_sec_ctrl"] = 1 - p
                except ValueError:
                    print(f"Warning: Could not convert first three elements to numbers in line: {line.strip()}")
    return _dict

def _extract_data_to_dataframe(lines):
    extracted_data = [] 
    for line in lines:
        if line.startswith("[data]"):
                data_part = line[len("[data]"):].strip()
                pairs = data_part.split(', ')
                data_dict = {}
                for pair in pairs:
                    if ':' in pair:
                        key, value = pair.split(': ', 1)
                        data_dict[key.strip()] = value.strip()
                extracted_data.append(data_dict)
    if extracted_data:
        df = pd.DataFrame(extracted_data)
        df['headway'] = df['headway'].astype('float')
        # df['cross_lane_headway'] = df['cross_lane_headway'].astype('float')
        df['ttc'] = df['ttc'].astype('float')
        def bool_str_to_num(x):
            if x=='True':
                return 1  
            else:
                return 0
        df['on lane'] = df['on lane'].apply(bool_str_to_num)
        df = df.drop(['cross_lane_headway'], axis=1)
        return df
    else:
        raise ValueError("Empty data")  # Return an empty DataFrame

HEADWAY_THRESH = 6.4737
TTC_THRESH = 77.2788
print(f"HEADWAY_THRESH = {HEADWAY_THRESH}")
print(f"TTC_THRESH = {TTC_THRESH}")

def _extract_data_statistics(lines, headway_thresh=HEADWAY_THRESH, ttc_thresh=TTC_THRESH):
    df = _extract_data_to_dataframe(lines)
    n_rows = len(df)
    proportions = {}

    for val3 in [0, 1]:
        for op2 in ['le', 'gt']:
            for op1 in ['le', 'gt']:
                subset_name = f"headway_{op1}_ttc_{op2}_on_lane_{val3}"

                condition1 = (df.iloc[:, 0] <= headway_thresh) if op1 == 'le' else (df.iloc[:, 0] > headway_thresh)
                condition2 = (df.iloc[:, 1] <= ttc_thresh) if op2 == 'le' else (df.iloc[:, 1] > ttc_thresh)
                condition3 = (df.iloc[:, 2] == val3)

                subset = df[condition1 & condition2 & condition3]
                proportions[subset_name] = len(subset) / n_rows if n_rows > 0 else 0.0

    return proportions


HEADWAY_THRESH = 6.4737
TTC_THRESH = 77.2788


In [348]:
import os
input_dir = os.getcwd() + "/out/highway_env_continuous_actions/experiment/run2"
list_of_dicts = []
directory = input_dir
for root, _, files in os.walk(directory):
        for file in files:
            if file.endswith(".txt"):
                file_path = os.path.join(root, file)
                try:
                    with open(file_path, 'r') as f:
                        lines = f.readlines()
                        _dict = _extract_config(lines)
                        _dict.update(_extract_switch_probs(lines))
                        _dict.update(_extract_exit_probs(lines))
                        _dict.update(_extract_data_statistics(lines))
                        list_of_dicts.append(_dict)
                except Exception as e:
                    print(f"Error reading file {file_path}: {e}")
import pandas as pd
pd.DataFrame(list_of_dicts).sort_values(by="pctrl_only", ascending=True)

Unnamed: 0,pctrl_only,ctrl_id,time_delay,cross_lane_headway,prob_to_pri_ctrl,prob_to_sec_ctrl,prob_to_exit_sec_ctrl,prob_to_stay_in_sec_ctrl,headway_le_ttc_le_on_lane_0,headway_gt_ttc_le_on_lane_0,headway_le_ttc_gt_on_lane_0,headway_gt_ttc_gt_on_lane_0,headway_le_ttc_le_on_lane_1,headway_gt_ttc_le_on_lane_1,headway_le_ttc_gt_on_lane_1,headway_gt_ttc_gt_on_lane_1
0,False,ctrl_1,0.1,4.0,0.393487,0.393487,0.445115,0.554885,0.027858,0.01111,0.065614,0.137911,0.177527,0.03378,0.232539,0.313661
1,False,ctrl_1,0.2,4.0,0.61074,0.61074,0.299117,0.700883,0.0498,0.012803,0.067793,0.097638,0.283774,0.027625,0.275354,0.185213
2,False,ctrl_2,0.1,4.0,0.393487,0.393487,0.397167,0.602833,0.026328,0.009655,0.062339,0.135904,0.177445,0.033681,0.23692,0.317728
4,False,ctrl_2,0.2,4.0,0.61074,0.61074,0.30768,0.69232,0.046163,0.011092,0.066956,0.097365,0.293107,0.024298,0.273791,0.187228
3,True,ctrl_1,0.2,4.0,0.61074,0.61074,,,0.084886,0.006366,0.044151,0.018513,0.362407,0.009928,0.368948,0.104801


In [277]:
import re
def _extract_probabilities(file_path):
    try:
        with open(file_path, 'r') as f:
            for line in f:
                if line.startswith("[probabilities]"):
                    # Use regular expression to find all numbers in the line
                    numbers_str = re.findall(r"[-+]?\d*\.\d+|\d+", line)
                    if len(numbers_str) >= 2:
                        try:
                            num1 = float(numbers_str[0])
                            num2 = float(numbers_str[1])
                            return {"to_pri_ctrl": num1, "to_sec_ctrl": num2}
                        except ValueError:
                            print(f"Warning: Could not convert first two elements to numbers in line: {line.strip()}")
    except FileNotFoundError:
        print(f"Error: File not found at path: {file_path}")
    except Exception as e:
        print(f"An error occurred: {e}")

In [278]:
import os
# file = os.getcwd() + "/out/highway_env_continuous_actions/experiment/20250410_201541/" + "log.txt"
file = os.getcwd() + "/out/highway_env_continuous_actions/experiment/20250411_162613/" + "log.txt"
_extract_probabilities(file)

{'to_pri_ctrl': 0.22120925981345352, 'to_sec_ctrl': 0.6065127954211831}

In [173]:
# %reset




Once deleted, variables cannot be recovered. Proceed (y/[n])?  n


Nothing done.


Once deleted, variables cannot be recovered. Proceed (y/[n])?  n


marked 2

In [250]:
import pandas as pd

def extract_data_to_dataframe(input_file):
    """
    Extracts lines starting with '[data]' from a text file,
    parses the key-value pairs, and returns a pandas DataFrame.

    Args:
        input_file (str): Path to the input text file.

    Returns:
        pandas.DataFrame: A DataFrame containing the extracted data.
                          Returns an empty DataFrame if no matching lines are found
                          or if there's an error.
    """
    extracted_data = []
    try:
        with open(input_file, 'r') as infile:
            for line in infile:
                if line.startswith("[data]"):
                    data_part = line[len("[data]"):].strip()
                    pairs = data_part.split(', ')
                    data_dict = {}
                    for pair in pairs:
                        if ':' in pair:
                            key, value = pair.split(': ', 1)
                            data_dict[key.strip()] = value.strip()
                    extracted_data.append(data_dict)

        if extracted_data:
            df = pd.DataFrame(extracted_data)
            return df
        else:
            return pd.DataFrame()  # Return an empty DataFrame

    except FileNotFoundError:
        print(f"Error: Input file '{input_file}' not found.")
        return pd.DataFrame()
    except Exception as e:
        print(f"An error occurred: {e}")
        return pd.DataFrame()

In [251]:
import os
# file = os.getcwd() + "/out/highway_env_continuous_actions/experiment/20250410_201541/" + "log.txt"
file = os.getcwd() + "/out/highway_env_continuous_actions/experiment/20250411_162613/" + "log.txt"
df = extract_data_to_dataframe(file)
df['headway'] = df['headway'].astype('float')
# df['cross_lane_headway'] = df['cross_lane_headway'].astype('float')
df['ttc'] = df['ttc'].astype('float')
def bool_str_to_num(x):
    if x=='True':
        return 1  
    else:
        return 0
df['on lane'] = df['on lane'].apply(bool_str_to_num)
df = df.drop(['cross_lane_headway'], axis=1)

In [256]:
(df['headway'].median(), df['ttc'].quantile(0.25))

(6.4737, 77.2788)

In [271]:
def calculate_subset_proportions(df, headway_thresh, ttc_thresh):
    n_rows = len(df)
    proportions = {}

    for val3 in [0, 1]:
        for op2 in ['le', 'gt']:
            for op1 in ['le', 'gt']:
                subset_name = f"headway_{op1}_{headway_thresh}_ttc_{op2}_{ttc_thresh}_on_lane_{val3}"

                condition1 = (df.iloc[:, 0] <= headway_thresh) if op1 == 'le' else (df.iloc[:, 0] > headway_thresh)
                condition2 = (df.iloc[:, 1] <= ttc_thresh) if op2 == 'le' else (df.iloc[:, 1] > ttc_thresh)
                condition3 = (df.iloc[:, 2] == val3)

                subset = df[condition1 & condition2 & condition3]
                proportions[subset_name] = len(subset) / n_rows if n_rows > 0 else 0.0

    return proportions

In [272]:
calculate_subset_proportions(df, 6.4737, 77.2788)

{'headway_le_6.4737_headway_le_77.2788_on_lane_0': 0.027742713680608152,
 'headway_gt_6.4737_headway_le_77.2788_on_lane_0': 0.010968049594659036,
 'headway_le_6.4737_headway_gt_77.2788_on_lane_0': 0.06440573368116918,
 'headway_gt_6.4737_headway_gt_77.2788_on_lane_0': 0.13178490280232263,
 'headway_le_6.4737_headway_le_77.2788_on_lane_1': 0.1762181267356728,
 'headway_gt_6.4737_headway_le_77.2788_on_lane_1': 0.035092148447361776,
 'headway_le_6.4737_headway_gt_77.2788_on_lane_1': 0.23167550281915342,
 'headway_gt_6.4737_headway_gt_77.2788_on_lane_1': 0.32211282223905296}

In [48]:
df

Unnamed: 0,headway,cross_lane_headway,ttc,on lane
0,0.9272,0.927185,inf,0
1,5.3966,1.067731,283.3013,1
2,9.0450,1.141168,3755.6514,1
3,9.3800,1.213120,inf,1
4,6.2088,1.306240,3203.6357,1
...,...,...,...,...
35644,7.0721,6.131449,51.3397,1
35645,4.9349,4.898515,17.7554,1
35646,3.8916,3.886544,10.0707,1
35647,4.7832,4.116016,16.0853,1


In [44]:
df.dtypes

headway               float32
cross_lane_headway    float32
ttc                   float32
on lane                 int64
dtype: object

In [37]:
df['on lane'][0]

True

In [None]:
condition1 = df['headway'] < 3 & df['ttc']< 3 & t['

In [193]:
type(1.0)

float

In [195]:
type(1.0) == float

True