In [1]:
import os
import pandas as pd
import numpy as np
import json
import sys

from matplotlib import pyplot as plt
from pandas.io.json import json_normalize

import logging
logger = logging.getLogger(__name__)

# These lines allow me to see logging.info messages in my jupyter cell output
logger.addHandler(logging.StreamHandler(stream=sys.stdout)) 
logger.setLevel(logging.DEBUG)

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Grab subject & trial info

In [2]:
class subject_data():

    def __init__(self, subject_data_folder):
        
        self.sub_id = os.path.split(subject_data_folder)[-1]
        self.data_parent_folder = os.path.join(*os.path.split(subject_data_folder)[:-1])
        self.data_folder = subject_data_folder
        
        self.experiment_settings = None # Making the attribute known
        self.read_experiment_settings_from_file() # self.experiment_settings is now set
        
        self.results = None
        self.read_trial_results_from_file()
 
    def get_trial_from_index(self,trial_index):
        
        return trial_data(self.results.iloc[trial_index],self.experiment_settings,self.data_folder)
    
    def read_experiment_settings_from_file(self):

        #settings_path = os.path.join(self.data_folder, 'S001', 'session_info', 'settings.json')
        settings_path = os.path.join(*os.path.split(self.data_folder), 'S001', 'session_info', 'settings.json')
            
        with open(settings_path) as json_data:
            self.experiment_settings = json.load(json_data)
            
    def read_trial_results_from_file(self):

        #trial_data_path = os.path.join(self.data_folder, 'S001')
        trial_data_path = os.path.join(*os.path.split(self.data_folder), 'S001')
        
        self.results = pd.read_csv( os.path.join( trial_data_path, 'trial_results.csv' ))

class trial_data():
    
    def __init__(self,trial_results_row_in, experiment_settings, subject_data_folder):
        
        self.data_parent_folder = os.path.join(*os.path.split(subject_data_folder)[:-1])
        self.data_folder = subject_data_folder
        
        self.results = trial_results_row_in
        self.experiment_settings = experiment_settings

        # Get raw data
        self.road_vertices = pd.read_csv(os.path.join(self.data_parent_folder, *trial_results_row_in['road_vertices_location_0'].split('/')[1:]))
        self.road_data = pd.read_csv(os.path.join(self.data_parent_folder, *trial_results_row_in['roadTransformMat_location_0'].split('/')[1:]))
        self.car_data = pd.read_csv(os.path.join(self.data_parent_folder,*trial_results_row_in['simplecar_carTransformMatrix_location_0'].split('/')[1:]))
        
    def time_of_deceleration(self):
        
        brake_positon_from_trial = self.car_data[['time', 'brake_position']]
        brake_positon_from_trial_array = np.array(brake_positon_from_trial)
        
        time_of_deceleration = np.nan
        
        for idx in range(0,len(brake_positon_from_trial)):
            if brake_positon_from_trial_array[idx,1] > 0.01955:
                time_of_deceleration = brake_positon_from_trial_array[idx,0]
                break
        
        return time_of_deceleration
    
    def time_to_stop(self, time_of_decel, time_of_full_stop):
        
        time_taken_to_break = np.nan
        
        if time_of_decel != np.nan and time_of_full_stop != np.nan:
            hold = time_of_full_stop - time_of_decel
            time_taken_to_break = round(hold,2)
        
        return time_taken_to_break
    
    def TTC_on_brake_onset(self, time_of_decel):
        
        car_data_from_trial = self.car_data[['time','car_speed', 'distance_to_obstacle']]
        car_data_from_trial_array = np.array(car_data_from_trial)
        speed = 0
        distance = 0
        ttc = np.nan
        
        for idx in range(0,len(car_data_from_trial)):
            if car_data_from_trial_array[idx,0] == time_of_decel:
                speed = car_data_from_trial_array[idx, 1]
                distance = car_data_from_trial_array[idx, 2]
                if distance != np.nan:
                    hold = distance/ speed
                    ttc = round(hold,2)
                break
        
        return ttc

# Analysis

In [3]:
## Multiple Subjects

data_parent_folder = 'raw_data'
filenames_cws = ['CWS']
cws_subjects_folder = os.path.join( data_parent_folder, filenames_cws[0])
cws_data_folders = os.listdir(cws_subjects_folder)


filenames_no_cws = ['No CWS']
no_cws_subjects_folder = os.path.join( data_parent_folder, filenames_no_cws[0])
no_cws_data_folders = os.listdir(no_cws_subjects_folder)

all_data = pd.DataFrame()
average_time_to_stop_subdata = pd.DataFrame()
average_TTC_after_brake_subdata = pd.DataFrame()
collision_data = pd.DataFrame()

for cws_files_idx in range(0,len(cws_data_folders)):
    directory = os.path.join(data_parent_folder, filenames_cws[0],cws_data_folders[cws_files_idx])
    time_taken_to_stop = []
    ttc_on_brake_onset = []
    this_sub_data = subject_data(directory)
    
    #Iterate through every trial for analysis for this one subject
    for idx, row in this_sub_data.results.iterrows():
    
        # Grab data from a specific trial
        a_trial = this_sub_data.get_trial_from_index(idx)
        trial_start_time = this_sub_data.results['start_time'][idx]
        time_of_stop = this_sub_data.results['time_car_stopped_and_obstacle_removed'][idx]
        distance_between_car_and_obs_when_stop = this_sub_data.results['distance_when_car_stopped_and_obstacle_removed'][idx]
        obs_vis = this_sub_data.results['obstacle_visibility'][idx]
        collision_occurred = this_sub_data.results['collision_has_occurred'][idx]
    
        # Calculate the time taken to come to a full stop after brake onset
        time_of_dec = a_trial.time_of_deceleration()
        time_to_stop = a_trial.time_to_stop(time_of_dec,time_of_stop)
        time_taken_to_stop.append(time_to_stop)
    
        # Calculate TTC on brake onset
        TTC = a_trial.TTC_on_brake_onset(time_of_dec)
        ttc_on_brake_onset.append(TTC)
    
    # Outputting some data
    this_sub_data.results['time_taken_to_stop'] = time_taken_to_stop
    this_sub_data.results['TTC_on_brake_onset'] = ttc_on_brake_onset
    if_collision_occurred = this_sub_data.results[['experiment','obstacle_visibility','collision_has_occurred']]
    collision_data = pd.concat([collision_data, if_collision_occurred])

    #collision_data = pd.concat([collision_data, collision_info])
    all_data = pd.concat([all_data, this_sub_data.results])
    
for no_cws_files_idx in range(0,len(no_cws_data_folders)):
    directory = os.path.join(data_parent_folder, filenames_no_cws[0],no_cws_data_folders[no_cws_files_idx])
    time_taken_to_stop = []
    ttc_on_brake_onset = []
    this_sub_data = subject_data(directory)
    
    #Iterate through every trial for analysis for this one subject
    for idx, row in this_sub_data.results.iterrows():
    
        # Grab data from a specific trial
        a_trial = this_sub_data.get_trial_from_index(idx)
        trial_start_time = this_sub_data.results['start_time'][idx]
        time_of_stop = this_sub_data.results['time_car_stopped_and_obstacle_removed'][idx]
        distance_between_car_and_obs_when_stop = this_sub_data.results['distance_when_car_stopped_and_obstacle_removed'][idx]
        obs_vis = this_sub_data.results['obstacle_visibility'][idx]
        collision_occurred = this_sub_data.results['collision_has_occurred'][idx]
    
        # Calculate the time taken to come to a full stop after brake onset
        time_of_dec = a_trial.time_of_deceleration()
        time_to_stop = a_trial.time_to_stop(time_of_dec,time_of_stop)
        time_taken_to_stop.append(time_to_stop)
    
        # Calculate TTC on brake onset
        TTC = a_trial.TTC_on_brake_onset(time_of_dec)
        ttc_on_brake_onset.append(TTC)
    
    # Outputting some data
    this_sub_data.results['time_taken_to_stop'] = time_taken_to_stop
    this_sub_data.results['TTC_on_brake_onset'] = ttc_on_brake_onset
    if_collision_occurred = this_sub_data.results[['experiment','obstacle_visibility','collision_has_occurred']]
    collision_data = pd.concat([collision_data, if_collision_occurred])
    
    all_data = pd.concat([all_data, this_sub_data.results])

# Saving info
avg_time_to_stop = all_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['time_taken_to_stop'].mean().round(2)
time_to_stop_std = all_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['time_taken_to_stop'].std().round(2)
Duration_of_Deceleration = pd.concat([avg_time_to_stop.rename('Average Duration of Deceleration'), time_to_stop_std.rename('StdDev')], axis=1)
print("Average Time Taken to Come to a Complete Stop After Brake onset (seconds)",avg_time_to_stop,sep='\n')
avg_TTC_after_brake = all_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['TTC_on_brake_onset'].mean().round(2)
TTC_after_brake_std = all_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['TTC_on_brake_onset'].std().round(2)
TTC_on_Brake_Onset = pd.concat([avg_TTC_after_brake.rename('Average TTC on brake onset'), TTC_after_brake_std.rename('StdDev')], axis=1)
print("Average TTC After Brake onset (seconds)",avg_TTC_after_brake,sep='\n')
avg_collision_data = collision_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['collision_has_occurred'].mean().mul(100).round(1).astype(str) + '%'

all_data.to_csv('C:/Users/irosa/OneDrive/Desktop/RIT/Senior Project/Results/all_data.csv')
avg_collision_data.to_csv('C:/Users/irosa/OneDrive/Desktop/RIT/Senior Project/Results/Collision_Occurance.csv')
Duration_of_Deceleration.to_csv('C:/Users/irosa/OneDrive/Desktop/RIT/Senior Project/Results/Duration_of_Deceleration.csv')
TTC_on_Brake_Onset.to_csv('C:/Users/irosa/OneDrive/Desktop/RIT/Senior Project/Results/TTC_on_Brake_Onset.csv')


Average Time Taken to Come to a Complete Stop After Brake onset (seconds)
experiment  obstacle_visibility
CWS         0                       NaN
            3                      2.70
            12                     2.32
            100                    2.74
No CWS      0                       NaN
            3                       NaN
            12                     2.05
            100                    2.33
Name: time_taken_to_stop, dtype: float64
Average TTC After Brake onset (seconds)
experiment  obstacle_visibility
CWS         0                       NaN
            3                      2.31
            12                     2.47
            100                    2.29
No CWS      0                       NaN
            3                      1.39
            12                     1.24
            100                    1.87
Name: TTC_on_brake_onset, dtype: float64


In [4]:
mean = all_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['time_taken_to_stop'].mean().round(2)
std = all_data.groupby(by=['experiment','obstacle_visibility'], dropna=True)['time_taken_to_stop'].std().round(2)

test = pd.concat([mean, std], axis=1)
test

Unnamed: 0_level_0,Unnamed: 1_level_0,time_taken_to_stop,time_taken_to_stop
experiment,obstacle_visibility,Unnamed: 2_level_1,Unnamed: 3_level_1
CWS,0,,
CWS,3,2.7,0.53
CWS,12,2.32,0.79
CWS,100,2.74,0.68
No CWS,0,,
No CWS,3,,
No CWS,12,2.05,1.05
No CWS,100,2.33,0.78


In [5]:
## Single Subject
'''
data_parent_folder = 'raw_data'
#filenames = ['ImergenWstopsFull']
#filenames = ['Imergen_021423_sampleset']
filenames = ['CWS\Imergen_03032023_sampleset']

subNum = 0
subject_data_folder = os.path.join( data_parent_folder, filenames[subNum])
this_sub_data = subject_data(subject_data_folder)

time_taken_to_stop = []
ttc_on_brake_onset = []
#print("obs_vis",", ","collision_occurred",", ","time_to_stop")

#Iterate through every trial for analysis for this one subject
for idx, row in this_sub_data.results.iterrows():
    
    # Grab data from a specific trial
    a_trial = this_sub_data.get_trial_from_index(idx)
    trial_start_time = this_sub_data.results['start_time'][idx]
    time_of_stop = this_sub_data.results['time_car_stopped_and_obstacle_removed'][idx]
    distance_between_car_and_obs_when_stop = this_sub_data.results['distance_when_car_stopped_and_obstacle_removed'][idx]
    obs_vis = this_sub_data.results['obstacle_visibility'][idx]
    collision_occurred = this_sub_data.results['collision_has_occurred'][idx]
    
    # Calculate the time taken to come to a full stop after brake onset
    time_of_dec = a_trial.time_of_deceleration()
    time_to_stop = a_trial.time_to_stop(time_of_dec,time_of_stop)
    time_taken_to_stop.append(time_to_stop)
    #print(obs_vis,", ",collision_occurred,", ",time_to_stop)
    
    # Calculate TTC on brake onset
    TTC = a_trial.TTC_on_brake_onset(time_of_dec)
    ttc_on_brake_onset.append(TTC)

# Calculating fraction of collisions over the time of the participant's session
collision_info = this_sub_data.collision_information()

# Outputting some data
this_sub_data.results['time_taken_to_stop'] = time_taken_to_stop
this_sub_data.results['TTC_on_brake_onset'] = ttc_on_brake_onset

# Displaying some info
print("Collision Information",collision_info,sep='\n')
avg_time_to_stop = this_sub_data.results.groupby(by=['experiment','obstacle_visibility'], dropna=True)['time_taken_to_stop'].mean().round(2)
print("Average Time Taken to Come to a Complete Stop After Brake onset (seconds)",avg_time_to_stop,sep='\n')
average_TTC_after_brake = this_sub_data.results.groupby(by=['experiment','obstacle_visibility'], dropna=True)['TTC_on_brake_onset'].mean().round(2)
print("Average TTC After Brake onset (seconds)",average_TTC_after_brake,sep='\n')
'''

'\ndata_parent_folder = \'raw_data\'\n#filenames = [\'ImergenWstopsFull\']\n#filenames = [\'Imergen_021423_sampleset\']\nfilenames = [\'CWS\\Imergen_03032023_sampleset\']\n\nsubNum = 0\nsubject_data_folder = os.path.join( data_parent_folder, filenames[subNum])\nthis_sub_data = subject_data(subject_data_folder)\n\ntime_taken_to_stop = []\nttc_on_brake_onset = []\n#print("obs_vis",", ","collision_occurred",", ","time_to_stop")\n\n#Iterate through every trial for analysis for this one subject\nfor idx, row in this_sub_data.results.iterrows():\n    \n    # Grab data from a specific trial\n    a_trial = this_sub_data.get_trial_from_index(idx)\n    trial_start_time = this_sub_data.results[\'start_time\'][idx]\n    time_of_stop = this_sub_data.results[\'time_car_stopped_and_obstacle_removed\'][idx]\n    distance_between_car_and_obs_when_stop = this_sub_data.results[\'distance_when_car_stopped_and_obstacle_removed\'][idx]\n    obs_vis = this_sub_data.results[\'obstacle_visibility\'][idx]\n    

In [6]:
'''
## One Trial
data_parent_folder = 'raw_data'
filenames = ['ImergenWstopsFull']

subNum = 0
subject_data_folder = os.path.join( data_parent_folder, filenames[subNum])
this_sub_data = subject_data(subject_data_folder)
a_trial = this_sub_data.get_trial_from_index(33)
'''

"\n## One Trial\ndata_parent_folder = 'raw_data'\nfilenames = ['ImergenWstopsFull']\n\nsubNum = 0\nsubject_data_folder = os.path.join( data_parent_folder, filenames[subNum])\nthis_sub_data = subject_data(subject_data_folder)\na_trial = this_sub_data.get_trial_from_index(33)\n"

In [13]:
agg_data_df = all_data.groupby(by=['experiment','obstacle_visibility']).agg([np.nanmean,np.nanstd])


  agg_data_df = all_data.groupby(by=['experiment','obstacle_visibility']).agg([np.nanmean,np.nanstd])


In [17]:
#list(agg_data_df.columns)
agg_data_df[('TTC_on_brake_onset', 'nanmean')]

experiment  obstacle_visibility
CWS         0                           NaN
            3                      2.306250
            12                     2.472500
            100                    2.288750
No CWS      0                           NaN
            3                      1.388571
            12                     1.240000
            100                    1.866250
Name: (TTC_on_brake_onset, nanmean), dtype: float64

In [19]:
agg_data_df[('TTC_on_brake_onset')]


Unnamed: 0_level_0,Unnamed: 1_level_0,nanmean,nanstd
experiment,obstacle_visibility,Unnamed: 2_level_1,Unnamed: 3_level_1
CWS,0,,
CWS,3,2.30625,0.127272
CWS,12,2.4725,0.436013
CWS,100,2.28875,0.150849
No CWS,0,,
No CWS,3,1.388571,1.594579
No CWS,12,1.24,0.839541
No CWS,100,1.86625,0.509592


In [20]:
import seaborn as sb

ModuleNotFoundError: No module named 'seaborn'