# Collision Type

In [157]:
import csv
import numpy as np
import matplotlib.pyplot as plt
import math
import pandas as pd
import os
import seaborn as sns
from pathlib import Path
from IPython.display import display
import ast
import pickle
# from env_utils import read_pso_search_files, detect_convergence
%matplotlib inline

from data_utils import find_match_from_seach_collector, get_unique_search_df, get_time_diff_df

In [158]:
def get_collision_type(x_ego, y_ego, v_ego, yaw_ego, x_npc, y_npc, v_npc, yaw_npc):
    # Calculate the relative position angle
    rel_angle = math.atan2(y_npc - y_ego, x_npc - x_ego)

    # Convert angles from radians to degrees for easier comparison
    rel_angle = abs(math.degrees(rel_angle))
    yaw_ego_deg = yaw_ego
    yaw_npc_deg = yaw_npc
    rel_yaw = abs(yaw_npc_deg - yaw_ego_deg)
    rel_angle_to_ego = abs(rel_angle - yaw_ego_deg) 

    if rel_yaw > 180:
            rel_yaw = 360 - rel_yaw

    if rel_angle_to_ego > 180:
            rel_angle_to_ego = 360 - rel_angle_to_ego

    col_type = ""

    # T-Bone collision
    if 80 <= rel_yaw <= 100:
        col_type = "C1"

    # Merge collision
    elif 0 < rel_yaw < 80 and 70 <= rel_angle_to_ego <= 170:
        col_type = "C2"

    # Head On collision
    elif 170 <= rel_yaw <= 180:
        col_type = "C3"

    # Front collision
    elif 100 <= rel_yaw <= 170:
        col_type = "C4"

    # Cutoff collision
    elif 0 <= rel_yaw < 80 and 0 <= rel_angle_to_ego < 70:
        col_type = "C5"

    # Behind collision
    elif 0 <= rel_yaw < 80 and 170 <= rel_angle_to_ego < 180:
        col_type = "C6"

    else:
        # If none of the conditions are met, return "Unknown"
        print(f"rel_angle_to_ego: {rel_angle_to_ego}, rel_yaw: {rel_yaw}")
        
        return None
    
    v_ego = abs(v_ego)
    v_npc = abs(v_npc)
    delta_v = abs(v_ego - v_npc)
    rel_v = v_npc - v_ego
    sub_type = ""

    threshold = 5

    if rel_v >threshold:
        sub_type = "H"
    elif -threshold <= rel_v <= threshold:
        sub_type = "M"
    elif rel_v < -threshold:
        sub_type = "L"

    final_col_type = col_type + sub_type
    return final_col_type


def get_all_collision_types(exp_path):
    search_df_unique = get_unique_search_df(exp_path)
    collision_types = {}
    for C in ["C1", "C2", "C3", "C4", "C5", "C6"]:
        for sub_type in ["H", "M", "L"]:
            collision_types[C+sub_type] = 0

    collision_status = search_df_unique["collision_status"]

    collision_status_list = []

    # parse each collision_status string into a dictionary
    for status in collision_status:
        dict_obj = ast.literal_eval(status)
        if len(dict_obj["EGO"]) == 0:
            continue
        collision_status_list.append(dict_obj)

    for status in collision_status_list:
        x_ego, y_ego, v_ego, yaw_ego = status["EGO"]
        x_npc, y_npc, v_npc, yaw_npc = status["NPC"]

        col_type = get_collision_type(
            x_ego, y_ego, v_ego, yaw_ego, x_npc, y_npc, v_npc, yaw_npc
        )

        if col_type is None:
            print(f"Unkonwn collision type exists:{status}")
            continue

        collision_types[col_type] += 1

    return collision_types

In [159]:
agents_data = {}

base_path = Path("../../output-final")

ADS_name = "apollo"

for scenario_name in os.listdir(base_path):
    scenario_path = os.path.join(base_path, scenario_name)
    if os.path.isdir(scenario_path):
        for agent_name in os.listdir(scenario_path):

            # Optinal: select one agent
            if agent_name != ADS_name:
                continue
            agent_path = os.path.join(scenario_path, agent_name)
            if os.path.isdir(agent_path):
                if agent_name not in agents_data:
                    agents_data[agent_name] = []
                for experiment_name in os.listdir(agent_path):

                    if "20x50" not in experiment_name:
                        continue

                    w = float(experiment_name.split("_")[1])

                    exp_path = os.path.join(agent_path, experiment_name)
                    collision_type_dict = get_all_collision_types(exp_path)

                    # get keys
                    types = list(collision_type_dict.keys())

                    # Add the scenario name and experiment name to the list of the agent
                    agents_data[agent_name].append(
                        (
                            scenario_name,
                            experiment_name,
                            w,
                            *[collision_type_dict[t] for t in types],
                        )
                    )

# Create a DataFrame for each agent
agents_dfs = {}
metrics = types
for agent_name, experiments in agents_data.items():
    # Create a DataFrame
    df = pd.DataFrame(
        experiments,
        columns=["Scenario", "Experiment", "w"] + metrics,
    )
    df = df.groupby("Scenario", group_keys=False).apply(lambda x: x.sort_values("w"))

    df.drop("Experiment", axis=1, inplace=True)
    agents_dfs[agent_name] = df


display(agents_dfs[ADS_name])
agents_dfs[ADS_name].to_csv("apollo_coltypes_detail.csv")

Unnamed: 0,Scenario,w,C1H,C1M,C1L,C2H,C2M,C2L,C3H,C3M,C3L,C4H,C4M,C4L,C5H,C5M,C5L,C6H,C6M,C6L
21,front_brake,0.0,0,0,0,0,33,1,0,0,0,0,0,0,0,0,0,0,65,8
24,front_brake,0.3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,196,86
23,front_brake,0.5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,259,87
20,front_brake,0.7,0,0,0,0,6,0,0,0,0,0,0,0,0,0,0,0,325,148
22,front_brake,1.0,0,0,0,0,1,2,0,1,0,0,0,0,0,0,0,0,479,21
16,front_cut_in_with_one_npc,0.0,0,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0,0
19,front_cut_in_with_one_npc,0.3,0,0,0,2,3,0,0,0,0,0,0,0,2,4,0,0,0,0
15,front_cut_in_with_one_npc,0.5,0,0,0,0,30,31,0,0,0,0,0,0,0,14,13,0,1,0
17,front_cut_in_with_one_npc,0.7,0,0,0,0,35,19,0,0,0,0,0,0,0,68,11,0,0,0
18,front_cut_in_with_one_npc,1.0,0,0,0,22,119,0,0,0,0,0,0,0,6,4,0,0,0,0


In [160]:
# Based on agents_dfs[ADS_name], group by w, and show the metrics in a new df
agent_df_copy = agents_dfs[ADS_name].copy()
# drop scenario column
agent_df_copy.drop("Scenario", axis=1, inplace=True)
# if C1-C6 >=1 then 1, if <1 then 0
agent_df_copy[metrics] = agent_df_copy[metrics].map(lambda x: 1 if x >= 1 else 0)
grouped_df = agent_df_copy.groupby("w").sum()
grouped_df[metrics] = grouped_df[metrics].map(lambda x: 1 if x >= 1 else 0)

display(grouped_df)
# grouped_df.to_csv("apollo_coltypes.csv")

Unnamed: 0_level_0,C1H,C1M,C1L,C2H,C2M,C2L,C3H,C3M,C3L,C4H,C4M,C4L,C5H,C5M,C5L,C6H,C6M,C6L
w,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
0.0,1,1,1,1,1,1,1,0,0,1,1,1,1,1,0,1,1,1
0.3,1,1,1,1,1,0,1,0,0,1,1,1,1,1,1,0,1,1
0.5,1,1,1,1,1,1,0,1,0,1,1,0,1,1,1,1,1,1
0.7,1,1,1,1,1,1,0,0,0,1,1,0,1,1,1,1,1,1
1.0,1,1,1,1,1,1,0,1,0,1,1,1,1,1,1,0,1,1


In [161]:
w_c_dict = {}
# for each row in grouped_df, if the value > 0, then add the metric name to w_c_dict[w]
for index, row in grouped_df.iterrows():
    w_c_dict[index] = []
    for metric in metrics:
        if row[metric] > 0:
            w_c_dict[index].append(metric)

# display(w_c_dict)

# convert w_c_dict's value into string
w_c_dict_str = {}
for k, v in w_c_dict.items():
    w_c_dict_str[k] = ",".join(v)

# display(w_c_dict_str)

# make w_c_dict_str into a DataFrame
w_c_df = pd.DataFrame(list(w_c_dict_str.items()), columns=["w", "collision_types"])
# drop index
w_c_df.set_index("w", inplace=True)

display(w_c_df)
w_c_df.to_csv("apollo_coltypes_summary.csv")


Unnamed: 0_level_0,collision_types
w,Unnamed: 1_level_1
0.0,"C1H,C1M,C1L,C2H,C2M,C2L,C3H,C4H,C4M,C4L,C5H,C5..."
0.3,"C1H,C1M,C1L,C2H,C2M,C3H,C4H,C4M,C4L,C5H,C5M,C5..."
0.5,"C1H,C1M,C1L,C2H,C2M,C2L,C3M,C4H,C4M,C5H,C5M,C5..."
0.7,"C1H,C1M,C1L,C2H,C2M,C2L,C4H,C4M,C5H,C5M,C5L,C6..."
1.0,"C1H,C1M,C1L,C2H,C2M,C2L,C3M,C4H,C4M,C4L,C5H,C5..."


In [162]:
def type_is_distinct(type_name, w_c_df):
    appear_count = 0
    for index, row in w_c_df.iterrows():
        if type_name in row["collision_types"]:
            appear_count += 1
    row_count = w_c_df.shape[0]
    if appear_count < row_count:
        return True
    else:
        return False
    
distinct_type_list = []
for index, row in w_c_df.iterrows():
    for col_type in row["collision_types"].split(","):
        if col_type not in distinct_type_list:
            if type_is_distinct(col_type, w_c_df):
                distinct_type_list.append(col_type)

# order the list
distinct_type_list.sort()
# list to str
distinct_type_str = ",".join(distinct_type_list)
print(len(distinct_type_list))
print(distinct_type_str)

6
C2L,C3H,C3M,C4L,C5L,C6H
