## 1 - Data Preparation

##### Imports

In [None]:
import os
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da     #scalable parallel computing
import json
import warnings

### 1. File Aggregation
 Reduction of the number of the .csv files by aggregating them parameter-wise.

In [2]:
# Set root directory
root_dir = './exports'
os.makedirs(root_dir, exist_ok=True)

In [6]:
def reorder_columns(ddf):   # Set 'participant_id' as first column
    col_order = ['participant_id'] + [col for col in ddf.columns if col != 'participant_id']
    return ddf[col_order]

def aggregate_files(root_dir, output_dir, file_name, sub = False):  # Aggregate csvs into one ddf and save the file
    os.makedirs(output_dir, exist_ok=True)
    all_ddfs = []
    for participant_folder in sorted(os.listdir(root_dir)):
        participant_path = os.path.join(root_dir, participant_folder)
        if not os.path.isdir(participant_path) or participant_folder == 'P03':
            continue  # skip if non-directories or excluded participant
        
        file_path = os.path.join(participant_path, 'surfaces', file_name) if sub else os.path.join(participant_path, file_name)
        if os.path.exists(file_path):
            if file_name == 'surface_gaze_distribution.csv' or file_name == 'surface_visibility.csv':   # filter out the first-row (total count)
                df = pd.read_csv(file_path, header=None)
                col_names = df.iloc[1].tolist() # get column names
                df = df.iloc[2:]                # get values only
                df.columns = col_names          # set column names
                ddf = dd.from_pandas(df, npartitions=1) # convert to dd
            else:
                ddf = dd.read_csv(file_path)            # read as dd
            ddf['participant_id'] = int(participant_folder[1:]) # add 'participant-id' column
            ddf = reorder_columns(ddf)                          # set 'participant_id' as first column
            all_ddfs.append(ddf)    # add to the aggregation list
        else:
            print(f"File {file_name} not found in {participant_folder}")
    
    if all_ddfs:
        aggregated_ddf = dd.concat(all_ddfs)    # aggregate the list into one ddf
        aggregated_ddf.to_csv(os.path.join(output_dir, f'all_{file_name}'), single_file=True)   # save ddf to file
    else:
        print(f"No data found for {file_name}")

In [1]:
# Tests / Single files
# aggregate_files(root_dir, './aggregated_data', 'gaze_positions.csv') # main folder
# aggregate_files(root_dir, './aggregated_data/all_surfaces', 'fixations_on_surface_HiDrive_Studie2.csv', sub = True) # subfolder
# aggregate_files(root_dir, './aggregated_data/all_surfaces', 'surface_visibility.csv', sub = True) # exception1 in subfolder

In [5]:
# Lists of files to aggregate
files_to_aggregate = [
    'blink_detection_report.csv',
    'blinks.csv',
    'export_info.csv',
    'fixation_report.csv',
    'fixations.csv',
    'gaze_positions.csv',
    'pupil_positions.csv',
    'world_timestamps.csv',
]

files_to_aggregate_sub = [
    'fixations_on_surface_HiDrive_Studie2.csv',
    'gaze_positions_on_surface_HiDrive_Studie2.csv',
    'marker_detections.csv',
    'surf_positions_HiDrive_Studie2.csv',
    'surface_events.csv',
    'surface_gaze_distribution.csv',
    'surface_visibility.csv',
]

In [None]:
# Run the aggregation
for file in files_to_aggregate:
    aggregate_files(root_dir, './aggregated_data', file)

In [8]:
for file_sub in files_to_aggregate_sub:
    aggregate_files(root_dir, './aggregated_data/all_surfaces', file_sub, sub = True)

### 2. Access to other types of file or info
Functions definition for retrieving non-aggregatable files or info (.mp4, .png, surf_gaze, surf_vis)

In [None]:
# Get file path for videos and images
def get_path(participant_folder, file_type):
    participant_path = os.path.join(root_dir, participant_folder)

    if not os.path.isdir(participant_path):
        print(f"Participant folder {participant_folder} does not exist.")
        return None
    
    if file_type == 'mp4':
        file_path = os.path.join(participant_path, 'world.mp4')
    elif file_type == 'png':
        file_path = os.path.join(participant_path, 'surfaces', 'heatmap_HiDrive_Studie2.png')
    else:
        print("Invalid file type. Use 'mp4' for video files or 'png' for image files.")
        return None
    
    if os.path.isfile(file_path):
        return file_path
    else:
        print(f"No '{file_type}' file found in {participant_folder}.")
        return None

In [41]:
# Tests
#print(get_path('P01', 'mp4'))
#print(get_path('P02', 'png'))
#print(get_path('P14', 'png'))
#print(get_path('P19', 'jpeg'))
#print(get_path('P21', 'png'))

In [52]:
# Get info on surface gaze or visibility count
def get_info(participant_folder, info_type):
    participant_path = os.path.join(root_dir, participant_folder)

    if not os.path.isdir(participant_path):
        print(f"Participant folder {participant_folder} does not exist.")
        return None
    
    if info_type == 'gaz':
        file_path = os.path.join(participant_path, 'surfaces', 'surface_gaze_distribution.csv')
    elif info_type == 'vis':
        file_path = os.path.join(participant_path, 'surfaces', 'surface_visibility.csv')
    else:
        print("Invalid file type. Use 'gaz' for gaze surface info or 'vis' for visibility surface info.")
        return None
    
    if os.path.isfile(file_path):
        df = pd.read_csv(file_path, header=None)
        info = df.iloc[0, 1]
        return info
    else:
        print(f"No '{info_type}' info found in {participant_folder}.")
        return None


In [42]:
# Tests
#print(get_info('P01', 'gaz'))
#print(get_info('P02', 'vis'))
#print(get_info('P14', 'vis'))
#print(get_info('P19', 'jpeg'))
#print(get_info('P21', 'pup'))

### 3. Categorical Variables
Identify categorical variables and print out the levels

In [45]:
# Instantiate output variable and set root directory
cat_vars = {}
root_dir = './aggregated_data'

In [46]:
def get_categoricals(df, cat_vars, file_name, incl_col, unique_threshold=0.1,):
    total_rows = len(df)
    for column in df.columns:
        if column not in incl_col:
            continue  # Skip columns that are not in the list
        
        unique_count = df[column].nunique()
        unique_ratio = unique_count / total_rows
        if unique_ratio < unique_threshold:
            unique_values = df[column].unique().tolist()
            if file_name not in cat_vars:
                cat_vars[file_name] = {}
            cat_vars[file_name][column] = unique_values
    
    return cat_vars

def save_to_json(dictionary, file_path):
    with open(file_path, 'w') as f:
        json.dump(dictionary, f, indent=4)

In [47]:
# List of columns to verify
columns_to_include = [
    'index',
    'filter_response',
    'base_data',
    'value',
    'method',
    'fixation classifier',
    'on_surf',
    'surface_name',
    'event_type',
    'num_definition_markers',

]

In [34]:
# Run the analysis
for subdir, _, files in os.walk(root_dir): # iterate through the files
    for file in files:
        if file.endswith('.csv'):
            file_path = os.path.join(subdir, file)
            df = pd.read_csv(file_path)
            cat_vars = get_categoricals(df, cat_vars, file, columns_to_include)
            print(f"{file} saved.")

# Save the dictionary to a JSON file
file_path = 'categorical_columns.json'
save_to_json(cat_vars, file_path)
print(f"Dictionary saved to {file_path}")


all_blinks.csv saved.
all_blink_detection_report.csv saved.
all_export_info.csv saved.
all_fixations.csv saved.
all_fixation_report.csv saved.
all_gaze_positions.csv saved.
all_pupil_positions.csv saved.
all_world_timestamps.csv saved.
all_fixations_on_surface_HiDrive_Studie2.csv saved.
all_gaze_positions_on_surface_HiDrive_Studie2.csv saved.
all_marker_detections.csv saved.
all_surface_events.csv saved.
all_surface_gaze_distribution.csv saved.
all_surface_visibility.csv saved.
all_surf_positions_HiDrive_Studie2.csv saved.
Dictionary saved to categorical_columns.json


### 4. Data Loading
Load the data into one dictionary variable as ddfs

In [None]:
# Load ddfs into dictionary (exception for 'all_surf_pos', df)
root_dir = './aggregated_data'
def load_data(root_dir, exception=False):    # Get csv file reads into one dictionary
    data = {}
    for subdir, _, files in os.walk(root_dir):
        for file in files:
            file_path = os.path.join(subdir, file)
            file_name = os.path.splitext(file)[0]   #file name without '.csv'
            if file.endswith('.csv') and file != 'all_surf_positions_HiDrive_Studie2.csv':
                data[file_name] = dd.read_csv(file_path)    #read and attach to dict
            elif exception and file == 'all_surf_positions_HiDrive_Studie2.csv':
                data [file_name] = pd.read_csv(file_path, converters=converters)    #read as normal and attach to dict
    return data
def parse(filedata): # Manually read the column
    output = []
    for line in filedata.split('\n'): # split into lines
        line = line.strip().rstrip(']').lstrip('[') #remove whitespace and brackets
        if not line:  
            continue    #skip empty lines
        line = line.split() #split into cell
        row = []
        for cell in line:
            cell = cell.strip()     #remove whitespace
            if not cell.strip():
                continue    #skip empty cells
            row.append(float(cell)) #convert to float and add
        output.append(row)
    return output
converters = {
    "img_to_surf_trans": parse,
    "surf_to_img_trans": parse,
    "dist_img_to_surf_trans": parse,
    "surf_to_dist_img_trans": parse,
}

data = load_data(root_dir)

### 4. Unix Timestamps
Convert pupil timestamps into unix timestamps

##### Data Loading

In [3]:
# Load ddfs into dictionary (exception for 'all_surf_pos', df)
root_dir = './aggregated_data'
def load_data(root_dir, exception=True):    # Get csv file reads into one dictionary
    data = {}
    for subdir, _, files in os.walk(root_dir):
        for file in files:
            file_path = os.path.join(subdir, file)
            file_name = os.path.splitext(file)[0]   #file name without '.csv'
            if file.endswith('.csv') and file != 'all_surf_positions_HiDrive_Studie2.csv':
                data[file_name] = dd.read_csv(file_path)    #read and attach to dict
            elif exception and file == 'all_surf_positions_HiDrive_Studie2.csv':
                data [file_name] = pd.read_csv(file_path, converters=converters)    #read as normal and attach to dict
    return data
def parse(filedata): # Manually read the column
    output = []
    for line in filedata.split('\n'): # split into lines
        line = line.strip().rstrip(']').lstrip('[') #remove whitespace and brackets
        if not line:  
            continue    #skip empty lines
        line = line.split() #split into cell
        row = []
        for cell in line:
            cell = cell.strip()     #remove whitespace
            if not cell.strip():
                continue    #skip empty cells
            row.append(float(cell)) #convert to float and add
        output.append(row)
    return output
converters = {
    "img_to_surf_trans": parse,
    "surf_to_img_trans": parse,
    "dist_img_to_surf_trans": parse,
    "surf_to_dist_img_trans": parse,
}

data = load_data(root_dir)
participant_ids = [1,2,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

##### Conversion parameter

In [4]:
# Get unix offsets per participant
json_names = {
    1: 'p01',
    2: 'p02',
    3: '-',
    4: 'p04',
    5: 'p05',
    6: 'p06',
    7: 'p07',
    8: 'p08',
    9: 'p09',
    10: 'p10',
    11: 'p11',
    12: 'p12',
    13: 'p13',
    14: 'p14',
    15: 'p15',
    16: 'p16',
    17: 'p17',
    18: 'p18',
    19: 'p19',
    20: 'p20',
}
unix_offset = {
      1: 0.0,
      2: 0.0,
      4: 0.0,
      5: 0.0,
      6: 0.0,
      7: 0.0,
      8: 0.0,
      9: 0.0,
      10: 0.0,
      11: 0.0,
      12: 0.0,
      13: 0.0,
      14: 0.0,
      15: 0.0,
      16: 0.0,
      17: 0.0,
      18: 0.0,
      19: 0.0,
      20: 0.0,
}

for pid in participant_ids:
    with open(f"./info_players/{json_names[pid]}.json") as file:
            meta_info = json.load(file)
    start_timestamp_diff = meta_info["start_time_system_s"] - meta_info["start_time_synced_s"]
    unix_offset[pid] = start_timestamp_diff

unix_offset

{1: 1710995923.0638132,
 2: 1710995923.0869443,
 4: 1710995922.0329595,
 5: 1710995922.054,
 6: 1710422800.0617552,
 7: 1710422798.8454993,
 8: 1710837270.471259,
 9: 1710837270.4714973,
 10: 1711015966.974017,
 11: 1711015966.8716204,
 12: 1711015966.3487737,
 13: 1711015966.4835043,
 14: 1711015966.4888632,
 15: 1711377444.7161496,
 16: 1711377443.4752297,
 17: 1711527764.3984728,
 18: 1711545871.2865818,
 19: 1711545870.2516506,
 20: 1711545870.261577}

##### Conversion Computation

In [17]:
#  Run
dfs_info = {
    "all_blinks": ["start_timestamp", "end_timestamp"],
    "all_fixations": ["start_timestamp"],
    "all_gaze_positions": ["gaze_timestamp"],
    "all_pupil_positions": ["pupil_timestamp"],
    "all_world_timestamps": ["# timestamps [seconds]"],
    "all_fixations_on_surface_HiDrive_Studie2": ["world_timestamp", "start_timestamp"],
    "all_gaze_positions_on_surface_HiDrive_Studie2": ["world_timestamp", "gaze_timestamp"],
    "all_surf_positions_HiDrive_Studie2": ["world_timestamp"],
    "all_surface_events": ["world_timestamp"]
}

output_dir = "./aggregated_data_unix"
os.makedirs(output_dir, exist_ok=True)
# warnings.filterwarnings("ignore", category=pd.errors.SettingWithCopyWarning)
warnings.filterwarnings("ignore", category=UserWarning)


for file_name, timestamp_columns in dfs_info.items():
    print(f"Computing '{file_name}'...")
    ddf = data[file_name]  # Keep as Dask DataFrame
    participant_ddfs = []
    
    for pid in participant_ids:
        participant_offset = unix_offset[pid]
        ddf_participant = ddf[ddf['participant_id'] == pid]
        
        for col in timestamp_columns:
            unix_col_name = f"{col}_unix"
            datetime_col_name = f"{col}_dt"
            
            # Calculate the Unix timestamp and datetime
            ddf_participant[unix_col_name] = ddf_participant[col] + participant_offset
            if file_name == "all_surf_positions_HiDrive_Studie2":
                ddf_participant[datetime_col_name] = pd.to_datetime(ddf_participant[unix_col_name], unit='s')
            else:
                ddf_participant[datetime_col_name] = dd.to_datetime(ddf_participant[unix_col_name], unit='s')
            ddf_participant[datetime_col_name] = ddf_participant[datetime_col_name].dt.tz_localize('UTC')
            ddf_participant[datetime_col_name] = ddf_participant[datetime_col_name].dt.tz_convert('Europe/Berlin')

        # Collect the processed dataframe for this participant
        participant_ddfs.append(ddf_participant)
    
    # Concatenate and export results
    output_file_path = os.path.join(output_dir, f"{file_name}.csv")
    print(f"Exporting {file_name} in {output_file_path}")
    if file_name == "all_surf_positions_HiDrive_Studie2":
        result_df = pd.concat(participant_ddfs)
        result_df.to_csv(output_file_path, index=False)
    else:
        result_ddf = dd.concat(participant_ddfs)
        result_df.to_csv(output_file_path, single_file=True)

    print(f"Exported {file_name}")

Computing 'all_pupil_positions'...
Exporting all_pupil_positions in ./aggregated_data_unix\all_pupil_positions.csv
Exported all_pupil_positions
