Code created together with Marc Vidal De Palol.

# Preprocessing: Unify the timestream and create all necessary dfs 

In [None]:
# run to free up memory
%reset

__Data Saved:__

Important: participant 8 is saved in two files, they are combined as one in 1.5v_.
-  __Timestamps_try_uid.json__ --> json with substracted start from most behavioral timestamps
-  __Timestamps_new_uid.json__ --> json with sorted timestamps (to match ETW)
- __Timestamps_misses_uid.json__ --> all the timestamps not matching ETW (mostly to check for errors)
- __Timestamps_overall_uid.json__ --> one timestream all other streams share
- __Behavior_new_uid.csv__ --> csv with behavioral columns, all with the same timestamps 
- __HitInfo_new_uid_raw.csv__ --> save all HitInfo with the same timestamp (for each timepoint save 30 rows, most are nulls)
- __HitInfo_new_uid.csv__ --> save HitInfo same as before but only rows that are not null
- __HitDistance_new_uid.csv__ --> save for each entry the smallest hit distance 
- __HitsSorted_new_uid.csv__ --> save the closest distance more HitInfo (fo sorted HitInfo_new df based on distance)

## Dependencies

__Dependencies__

In [None]:
import copy  # copy big/deep objects by value
import datetime  # datetime operations
import itertools  # operate with iterators
import json  # read/write from/into json format
import os  # OS operations (read/write files/folders)
import warnings  # hide warnings

# process parallelization
from multiprocessing import Manager, Pool, RawArray, cpu_count

import matplotlib.pyplot as plt  # mother of plots for Python
import numpy as np  # array/matrix operations (e.g. linear algebra)
import pandas as pd  # operate with dataframes
import pyxdf  # read XDF files (LSL streams recordings)
import seaborn as sns  # matplotlib plotting nice with shortcuts
from IPython.display import Markdown, display  # print nicely
from tqdm.notebook import tqdm  # mother of progressbars

__Options__

In [None]:
warnings.simplefilter(action="ignore", category=FutureWarning)

# raw and processed data paths
PATH_RAW = "./data/raw"
PATH_PROC = "./data/processed"

# specify decimals format on pandas tables
pd.options.display.float_format = "{:.3f}".format

STYLE = "darkgrid"
sns.set_style(STYLE)  # set seaborn plotting style

# progress bar customized format
B_FORMAT = """📄 {n_fmt} of {total_fmt} {desc} processed: {bar} 
            {percentage:3.0f}% ⏱️{elapsed} ⏳{remaining} ⚙️{rate_fmt}{postfix}"""


# streams to remove
REMOVE = [
    "openvibeSignal",
    "openvibeMarkers",
    "AgentRotation",
    "StaticAgentPosition",
    "StaticAgentRotation",
    "AgentPosition",
    "ValidationError",
    "ButtonPresses",
    "PlayerPosition",
]

# custom order of streams (to process recordings on the same order)
CUSTOM_ORDER = [
    "EyeTrackingWorld",
    "HitObjectNames",
    "HitObjectPositions",
    "HitPositionOnObjects",
    "EyeTrackingLocal",
    "HeadTracking",
    "HitObjectNamesHead",
    "HitObjectPositionsHead",
    "HitPositionOnObjectsHead",
]

# custom order of Behavior dataframe columns
BEH_COLS = [
    "valid",
    "leftBlink",
    "rightBlink",
    "ETWTime",
    "ETWoriginX",
    "ETWoriginY",
    "ETWoriginZ",
    "ETWdirectionX",
    "ETWdirectionY",
    "ETWdirectionZ",
    "HON",
    "ETLoriginX",
    "ETLoriginY",
    "ETLoriginZ",
    "ETLdirectionX",
    "ETLdirectionY",
    "ETLdirectionZ",
    "HToriginX",
    "HToriginY",
    "HToriginZ",
    "HTdirectionX",
    "HTdirectionY",
    "HTdirectionZ",
]

# custom order of hit_cols df
HIT_COLS = [
    "valid",
    "leftBlink",
    "rightBlink",
    "HON",
    "HOPX",
    "HOPY",
    "HOPZ",
    "HPOOX",
    "HPOOY",
    "HPOOZ",
]
# custom order of hit_sort df
HIT_SORT = [
    "valid",
    "leftBlink",
    "rightBlink",
    "HON",
    "distance",
    "HOPX",
    "HOPY",
    "HOPZ",
    "HPOOX",
    "HPOOY",
    "HPOOZ",
]

# dtypes specification to avoid dtype guessing warning
CUSTOM_DTYPES = {
    "valid": "boolean",
    "leftBlink": "boolean",
    "rightBlink": "boolean",
}

CORES = cpu_count()  # number of cpu threads for multiprocessing
print(f"Total CPU threads: {CORES}")

__Helper functions__

In [None]:
def printmd(string):
    """Print string nicely using Markdown syntax."""
    display(Markdown(string))


def pbar_fork_hack():
    """
    Hack to enforce progress bars to be displayed by fork processes on
    IPython Apps like Jupyter Notebooks.

    Avoids [IPKernelApp] WARNING | WARNING: attempted to send message from fork

    Important: pass this function as argument for the initializer parameter
    while initializing a multiprocessing pool to make it work. E.g.:

    pool = Pool(processes=N_CORES, initializer=pbar_fork_hack)

    Source:
     - https://github.com/ipython/ipython/issues/11049#issue-306086846
     - https://github.com/tqdm/tqdm/issues/485#issuecomment-473338308
    """
    print(" ", end="", flush=True)

__Recordings info__

In [None]:
# load the df created in script 0v
recordings = pd.read_csv("./recordings_village_old.csv", index_col=0)
recordings

Participant ids

In [None]:
# participants ids
ids = recordings.index.tolist()
old_id = recordings["old_id"].tolist() # old_id

## Prepare raw data

__Recording overview:__ display the different streams recorded

In [None]:
def read_labels(uid):
    """
    Display stream labels (if any) given a participant id.

    Parameters:
        uid (str): Participant identifier.
    """
    # get the recording filename associated with the participant id
    part = recordings.loc[uid].file
    # to store filename and uid
    res = f"__Recording `{part} - {uid}`__<br>"
    # load XDF data for the specified recording
    data, _ = pyxdf.load_xdf(f"{PATH_RAW}/{part}")
    # iterate over each stream in the recording
    for s in data:
        # get the stream name
        s_name = s["info"]["name"][0]
        # include stream name in the result string
        res += f"_{s_name}_: "
        # check that the stream is not 'openvibe' (EEG)
        if "openvibe" not in s_name:
            # extract stream labels (as specified in Unity) from the description 
            check = s["info"]["desc"][0]
            labels = None if not check else list(check.keys())
        else:
            # if stream is 'openvibe, set labels to "None"
            labels = "None"
        # include stream labels in the result string
        res += f"{labels}<br>"
    printmd(res)  # display them nicely


# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# get the list of participants from the reocrdings df
parts = recordings.index.tolist()
parts = parts[:]

# recordings progress bar
recs_pbar = tqdm(
    iterable=pool.imap(func=read_labels, iterable=parts),
    total=len(parts),
    desc="📼 recordings",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in recs_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

__Subestract Start from all Streams__

In [None]:
# get list of participants ids
ids = recordings.index.tolist()
idd = ids[:]
old_id = recordings["old_id"].tolist()

# iterate over participants
for u in range(len(idd)):
    uid = idd[u] # get current id
    oid = old_id[u] # old id
    
    # get the name of the file associated with the uid
    fname = recordings.loc[uid].file
    # get the start timestamp of the recording
    start = round(recordings.loc[uid].start, 3)
    
    # load XDF data for the participant
    data, _ = pyxdf.load_xdf(f"{PATH_RAW}/{fname}")

    # remove streams specified in the REMOVE list (EEG + streams with irregular SR)
    data = [d for d in data if d["info"]["name"][0] not in REMOVE]

    # streams progress bar
    streams_pbar = tqdm(
        iterable=data,
        desc=f"🧻 streams from participant {uid}",
        dynamic_ncols=True,
        bar_format=B_FORMAT,
    )

    # collect the beginning of each stream
    starts = []
    for st in streams_pbar:  # iterate over each stream
        # to ensure the data is no in "openvibeMarkers"
        if st["info"]["name"][0] not in ["openvibeMarkers"]:
            starts.append(st["time_stamps"][0])
    # get the minimum start of all streams
    to_sub = min(starts)

    # prepare a dictionary to store sorted timestamps for each stream
    sorted_out = {}
    # iterate over each stream
    for s in streams_pbar:
        if st["info"]["name"][0] not in ["openvibeMarkers"]:
            # get the current time stamps
            times = s["time_stamps"]
            s_name = s["info"]["name"][0]

            # substract the minimum timestamp across all streams from all 
            # timepoints of the current stream
            time = times - to_sub
            # round them to three points after the comma (ms precision)
            sorted_out[s_name] = [round(t, 3) for t in time]

    # store the sorted timestamps to a JSON file for each participant
    with open(f"{PATH_PROC}/Timestamps_try_{uid}.json", "w") as f:
        json.dump(sorted_out, f, indent=4)

## Sort Timestamps 

__Sort Timestamps to match ETW, takes a while__

In [None]:
def sort_timestamps(uid):
    # get the data
    fname = recordings.loc[uid].file
    f = open(
        f"{PATH_PROC}/Timestamps_try_{uid}.json", "r"
    )  # this is the file where we are substracting start timestamps from all of them
    times = json.load(f)  # load file content as JSON
    f.close()
    streams = list(times.keys())

    # prefedined variables to save the data
    new_times = {}  # save the newly ordered timestamps
    check_misses = {} # save the timestamps where we are using the original and not the new ones

    # get the first time stamp for each stream and the ETW stream
    for s in streams:
        if s == "EyeTrackingWorld":
            common_time = times[s]
            new_times[s] = times[s]
            check_misses[s] = []

    # streams progress bar
    streams_pbar = tqdm(
        iterable=streams,
        desc=f"🧻 streams from participant {uid}",
        dynamic_ncols=True,
        bar_format=B_FORMAT,
    )

    for s_name in streams_pbar:
        if (s_name not in REMOVE) and (s_name not in Check):
            # we want to match each stream ETW since this is the first stream in the eye-tracking file that should get saved
            # to add the reordered timestamps
            tim = []  # save the current timestamps in
            chm = []  # save the timestamps in should they not come from ETW
            comm = []
            start_world = 0 # gets counted up if there is a world point
            # go through all timestamps and check for matching ones
            for t in times[s_name]:
                # gets set to true if you found the corresponding stream
                start_stream = False  # this has to be checked for each element in times[s_name] as we want all the streams to be included
                # first check: see if there is an exact match
                if t in common_time:
                    # if there is an exact match, save that time stamp
                    # if there is more than one element .index() returns the first one in the list
                    tim.append(t)
                    # set the start to the current match
                    start_world = common_time.index(t)
                    start_stream = True
                # if it is still smaller than compare time  times[s_name][t]
                else:
                    for et in range(start_world, len(common_time)):
                        # if the distance between both is small enough, change it to ETW timestamp --> here direction does not matter
                        if (abs(common_time[et] - t) < 0.005) and (
                            start_stream == False
                        ):
                            tim.append(common_time[et])
                            start_stream = True
                            start_world = et
                            break
                    # if there are no time stamps left of ETW just add the current ones
                    if start_stream == False:
                        tim.append(t)
                        chm.append(t)
                        start_stream = True
                        comm.append(t)

            common_time = common_time + comm
            common_time.sort()

            if s_name not in Check:
                # after going through all of the timestamps of one stream, 
                # add the new timestamps to the variable that will be saved
                new_times[s_name] = tim  # adding new re-ordered timestamps
                check_misses[s_name] = chm  # adding the indices that were missed


    # save both the new timestamps as well as the missed ones
    with open(f"{PATH_PROC}/Timestamps_new_{uid}.json", "w") as f:
        json.dump(new_times, f, indent=4)
    with open(f"{PATH_PROC}/Timestamps_misses_{uid}.json", "w") as f:
        json.dump(check_misses, f, indent=4)
    with open(f"{PATH_PROC}/Timestamps_overall_{uid}.json", "w") as f:
        json.dump(common_time, f, indent=4)


# to be checked
Check = [
    "EyeTrackingWorld",
]


# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# participants ids
ids = recordings.index.tolist()
idd = ids[:]

# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap_unordered(func=sort_timestamps, iterable=idd),
    total=len(idd),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

## Create Beahavioral df

__Putting all behavioral data into one csv file (expect HON etc.)__

In [None]:
def sort_data(uid):
    # load data: raw data
    part = recordings.loc[uid].file
    data, _ = pyxdf.load_xdf(f"{PATH_RAW}/{part}")
    # remove non relevant streams specified in REMOVE list
    data = [d for d in data if d["info"]["name"][0] not in REMOVE]

    # load data: new timestream
    fname = recordings.loc[uid].file
    f = open(f"{PATH_PROC}/Timestamps_new_{uid}.json", "r")
    times = json.load(f)  # load file content as JSON
    f.close()

    # load data: common timestream
    f = open(f"{PATH_PROC}/Timestamps_overall_{uid}.json", "r")
    times_overall = json.load(f)  # load file content as JSON
    f.close()

    # streams progress bar
    streams_pbar = tqdm(
        iterable=data,
        desc=f"🧻 streams from participant {uid}",
        dynamic_ncols=True,
        bar_format=B_FORMAT,
    )

    check_if = False  # used later for concatentings dataframes
    
    # go through all the streams and try to match them to the common_time, otherwise add nan
    for data_s in streams_pbar:
        s_name = data_s["info"]["name"][
            0
        ]  # first, get the name of the current stream
        # prepare  for the lists:
        beh_adding = [] # list to store the adjusted behavioral data
        original_time = [] # list to store the original timestamps
        # only do stuff if the current stream is not in IGNORE:
        if s_name not in IGNORE:
            original_time = times[
                s_name
            ]  # get the time of the current stream that you want to adjust
            count_t = 0 # counter to keep track of the index in original_time list
            # go through every timestamp
            for t_time in range(len(times_overall)):
                t = times_overall[
                    t_time
                ]  # get the actual timestamp not just the index
                found_match = False  # this is used to check if the current timestamp was already matched
                
                for i in range(len(original_time)):
                    # so if the current timestamp is part of the original dataset, add the values
                    if original_time[i] == t:
                        # hitobjectnames: add hit object or empty
                        if s_name in "HitObjectNames":
                            if data_s["time_series"][i][0] == "Empty":
                                beh_adding.append("Empty")
                            else:
                                beh_adding.append("ObjectNames")
                        # else: add the values
                        else:
                            beh_adding.append(data_s["time_series"][i])
                        count_t = i  # count this up
                        found_match = True  # indicate that we found a match
                        break
                # if no match was found for the current timestamp, add NaN to the list
                if found_match == False:
                    if s_name in "HitObjectNames":
                        beh_adding.append(np.nan)
                    else:
                        to_add = np.empty(len(data_s["time_series"][count_t]))
                        to_add[:] = np.nan
                        beh_adding.append(list(to_add))

            # get the name of the colums for the dataframe
            check = data_s["info"]["desc"][0]
            labels = None if not check else list(check.keys())

            # the first time you save data into dataframe (so for the first stream) you have to create it
            if not check_if:
                # the index are the column names (e.g. valid,...)
                beh_data = pd.DataFrame(
                    data=beh_adding, columns=labels, index=times_overall
                )
                # check_if is true so we don't overwrite excisting data
                check_if = True
            # once you already have the beh_data dataframe you have to concatenate the onld and new one
            else:
                c = pd.DataFrame(
                    data=beh_adding, columns=labels, index=times_overall
                )
                beh_data = pd.concat([beh_data, c], axis=1, join="outer")


    # reorder the  dict:
    beh_data = beh_data.reindex(columns=BEH_COLS)

    # save the csv file
    beh_data.to_csv(f"{PATH_PROC}/Behavior_new_{uid}.csv", index=True)

# streams to be ingored 
IGNORE = [
    "HitObjectPositions",
    "HitPositionOnObjects",
    "HitObjectNamesHead",
    "HitPositionOnObjectsHead",
    "HitObjectPositionsHead",
]

# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# participants ids
ids = recordings.index.tolist()
idd = ids[18:]

# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap_unordered(func=sort_data, iterable=idd),
    total=len(idd),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

## Create Hit Info dfs

__Create Hit-Info DataFrame__

In [None]:
def process_hits(uid):
    """
    Arrange, process and store hit information data given a recording id.

    Parameters:
        uid (str): Participant identifier.
    """
    # load raw recording data
    fname = recordings.loc[uid].file
    data, _ = pyxdf.load_xdf(f"{PATH_RAW}/{fname}")

    # select only streams to process
    data = [d for d in data if d["info"]["name"][0] in INCLUDE]

    # reorder streams based on the order in INCLUDE list
    data.sort(key=lambda d: INCLUDE.index(d["info"]["name"][0]))

    # read sorted timestamps of all streams (dict)
    f = open(f"{PATH_PROC}/Timestamps_new_{uid}.json", "r")
    times = json.load(f)  # load file content as JSON
    f.close()

    # open previously sorted behavioural data (also to include valid blinks etc.)
    beh_data = pd.read_csv(
        f"{PATH_PROC}/Behavior_new_{uid}.csv", index_col=0, dtype=CUSTOM_DTYPES
    )

    row_nr = 30 # number of rows for repetition (was defined in Unity)

    # create timestamps
    ts = beh_data.index.tolist()
    timestamps = np.repeat(ts, row_nr)

    # creating beh_data df
    beh_cols = ["valid", "leftBlink", "rightBlink"]
    b = beh_data
    et = np.array([b.valid, b.leftBlink, b.rightBlink])
    et = et.T
    et_n = np.repeat(et, row_nr, axis=0)
    beh_df = pd.DataFrame(data=et_n, columns=beh_cols, index=timestamps)

    # create hit_obj df 
    hon = [] # for HitObjectNames (HON)
    hop = [] # HitObjectPositions (HOP) 
    hpoo = [] # HitPositionOnObjects (HPOO)

    # loop through each stream to extract relevant infromation
    print(f"{uid}: sort data")
    for d in data:  # for each stream
        s_name = d["info"]["name"][0]  # stream name
        if s_name == "HitObjectNames":
            t_hon = times[s_name]
            d_hon = d["time_series"]
            check = d["info"]["desc"][0]
            l_hon = None if not check else list(check.keys())
        elif s_name == "HitObjectPositions":
            t_hop = times[s_name]
            d_hop = d["time_series"]
            check = d["info"]["desc"][0]
            l_hop = None if not check else list(check.keys())
        elif s_name == "HitPositionOnObjects":
            t_hpoo = times[s_name]
            d_hpoo = d["time_series"]
            check = d["info"]["desc"][0]
            l_hpoo = None if not check else list(check.keys())

    # HON
    print(f"{uid}: hon")
    hon1 = [
        d_hon[[t_hon.index(tim)][0]] if tim in t_hon else [np.nan] * row_nr
        for tim in ts
    ]
    hon = [s for i in range(len(ts)) for s in hon1[i]]
    # HOP
    print(f"{uid}: hop")
    hop1 = [
        d_hop[[t_hop.index(tim)][0]] if tim in t_hop else [np.nan] * 90
        for tim in ts
    ]
    hop = [hop1[i][s : s + 3] for i in range(len(ts)) for s in range(0, 90, 3)]
    # HPOO
    print(f"{uid}: hpoo")
    hpoo1 = [
        d_hpoo[[t_hpoo.index(tim)][0]] if tim in t_hpoo else [np.nan] * 90
        for tim in ts
    ]
    hpoo = [
        hpoo1[i][s : s + 3] for i in range(len(ts)) for s in range(0, 90, 3)
    ]

    # create dataframes for HON, HOP, and HPOO
    c_hon = pd.DataFrame(data=hon, columns=l_hon, index=timestamps)
    c_hop = pd.DataFrame(data=hop, columns=l_hop, index=timestamps)
    c_hpoo = pd.DataFrame(data=hpoo, columns=l_hpoo, index=timestamps)

    # concatenate dataframes to create the final hit_data
    hit_data = pd.concat([beh_df, c_hon, c_hop, c_hpoo], axis=1, join="outer")
    hit_data = hit_data.reindex(columns=HIT_COLS)

    # save the csv file
    hit_data.to_csv(f"{PATH_PROC}/HitInfo_new_{uid}_raw.csv", index=True)

    # filter and clean hit_data
    h_res = hit_data[hit_data.valid == True]
    h_res = h_res[h_res.HON != "Empty"]
    h_res.replace("", np.nan, inplace=True)
    h_res.dropna(inplace=True)

    # store as CSV (clean)
    h_res.to_csv(f"{PATH_PROC}/HitInfo_new_{uid}.csv", index=True)


# stream names to include
INCLUDE = [
    "HitObjectNames",
    "HitObjectPositions",
    "HitPositionOnObjects",
]

# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# participants ids
ids = recordings.index.tolist()
idd = ids[:]


# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap_unordered(func=process_hits, iterable=idd),
    total=len(idd),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()

__Calculate hit distances at each timepoint and store the shortest one__

In [None]:
import math

ids = recordings.index.tolist()
idd = ids[:]

for i, uid in enumerate(idd):
    # read processed dataframes
    beh_df = pd.read_csv(
        f"{PATH_PROC}/Behavior_new_{uid}.csv", index_col=0, dtype=CUSTOM_DTYPES
    )
    # select behavioral data indices of valid raycast only
    times = beh_df.index.tolist()
    hit_df = pd.read_csv(f"{PATH_PROC}/HitInfo_new_{uid}.csv", index_col=0)
    h_time = (
        hit_df.index.tolist()
    )  # timestamps, possibly multiple per timepoint
    # to save the index (timestamps) in
    to_indx = []
    # hits progress bar
    times_pbar = tqdm(
        iterable=range(len(times)),
        desc=f"⌚ timestamps from participant {uid}",
        dynamic_ncols=True,
        bar_format=B_FORMAT,
    )
    # generate the empty hitdist dict
    h_dis = {"row": [], "distance": []}

    # generate te dict for distance
    dis_sorted = {"distance": []}
    # and where the rest is saved
    h_sorted = []
    for t in times_pbar:  # for each timestamp
        tim = times[t]
        # select current row in the beh data
        b = beh_df.iloc[t]
        # select current row in hit data
        h = hit_df[hit_df.index == tim]
        # origin points (eye-tracking)
        eto = np.array([b.ETWoriginX, b.ETWoriginY, b.ETWoriginZ])
        # get just the index of them
        h_idx = h.index.tolist()
        # if there is at least one element in HON
        if len(h_idx) > 0:
            distances = []
            for i in range(len(h)):  # for each row (object name)
                cur = h.iloc[i]  # current row
                # hit position on object as array (point in 3D space)
                hpoo = np.array([cur.HPOOX, cur.HPOOY, cur.HPOOZ])
                # calculate distance from ET origin
                distance = np.linalg.norm(hpoo - eto)
                distances.append(distance)  # store it
            if len(distances) > 0:
                count = 0
                found = True
                # closest distance
                closest = sorted(set(distances))[count]
                idx = distances.index(closest)  # index of closest distance
                # taking care of special cases
                if (isinstance(idx, str)) and "BodyCube" in h.iloc[idx].HON:
                    found = False
                    count = +1
                    # if the body cube is the first seen object --> here use set for length since we might have two identical hits
                    if count < len(set(distances)):
                        closest = sorted(set(distances))[count]
                        idx = distances.index(
                            closest
                        )  # index of closest distance
                        if "Head" in h.iloc[idx].HON:
                            count += 1
                            if count < len(distances):
                                closest = sorted(set(distances))[count]
                                idx = distances.index(
                                    closest
                                )  # index of closest distance
                                found = True
                        else:
                            found = True
                elif (isinstance(idx, str)) and "Head" in h.iloc[idx].HON:
                    found = False
                    count = +1
                    if count < len(distances):
                        closest = sorted(set(distances))[count]
                        idx = distances.index(
                            closest
                        )  # index of closest distance
                        if "BodyCube" in h.iloc[idx].HON:
                            count += 1
                            if count < len(distances):
                                closest = sorted(set(distances))[count]
                                idx = distances.index(
                                    closest
                                )  # index of closest distance
                                found = True
                        else:
                            found = True

                if (count + 1) < len(sorted(set(distances))):
                    closest2 = sorted(set(distances))[count + 1]
                    idx2 = distances.index(
                        closest2
                    )  # index of closest distance
                    if (isinstance(idx, str)) and (
                        "NPC" in h.iloc[idx].HON
                        and "face" in h.iloc[idx2].HON
                        and int("".join(filter(str.isdigit, h.iloc[idx].HON)))
                        == int("".join(filter(str.isdigit, h.iloc[idx2].HON)))
                        and abs(closest - closest2) < 1
                    ):
                        closest = closest2
                        idx = idx2
                        found = True

                if found:
                    to_indx = to_indx + [
                        h_idx[idx]
                    ]  # timestamps to use as index later
                    h_dis["row"].append(h_idx[idx])  # store row number (index)
                    h_dis["distance"].append(closest)  # store closest distance
                    dis_sorted["distance"].append(closest)
                    h_sorted.append(h.iloc[idx])

    # generate the dataframe of just the object and the distance
    hits = pd.DataFrame(h_dis)
    hits.to_csv(f"{PATH_PROC}/HitDistance_new_{uid}.csv", index=False)

    # generate df for the distance and the remaining data and then concatenate them and save as csv
    c = pd.DataFrame(dis_sorted, index=to_indx)
    hits_sorted = pd.DataFrame(data=h_sorted, columns=HIT_COLS, index=to_indx)
    hits_sorted = pd.concat([hits_sorted, c], axis=1, join="outer")
    # reorder so that distance is at second position
    hits_sorted = hits_sorted.reindex(columns=HIT_SORT)

    hits_sorted = hits_sorted[~hits_sorted.index.duplicated(keep="first")]
    # hits_sorted = hits_sorted.drop_duplicates(subset ="First Name",keep = False, inplace = True)
    hits_sorted.to_csv(f"{PATH_PROC}/HitsSorted_new_{uid}.csv", index=True)

In [None]:
def calculate_hits(uid):
    """
    Calculate and store hit information data given a recording id.

    Parameters:
        uid (str): Participant identifier.
    """
    # read processed dataframes
    beh_df = pd.read_csv(
        f"{PATH_PROC}/Behavior_new_{uid}.csv", index_col=0, dtype=CUSTOM_DTYPES
    )
    # select behavioral data indices of valid raycast only
    times = beh_df.index.tolist()
    hit_df = pd.read_csv(f"{PATH_PROC}/HitInfo_new_{uid}.csv", index_col=0)
    h_time = (
        hit_df.index.tolist()
    )  # timestamps, possibly multiple per timepoint
    # to save the index (timestamps) in
    to_indx = []

    # hits progress bar
    times_pbar = tqdm(
        iterable=range(len(times)),
        desc=f"⌚ timestamps from participant {uid}",
        dynamic_ncols=True,
        bar_format=B_FORMAT,
    )

    # generate the empty hitdist dict
    h_dis = {"row": [], "distance": []}

    # generate te dict for distance
    dis_sorted = {"distance": []}
    # and where the rest is saved
    h_sorted = []
    for t in times_pbar:  # for each timestamp
        tim = times[t]
        # select current row in the beh data
        b = beh_df.iloc[t]
        # select current row in hit data
        h = hit_df[hit_df.index == tim]
        # origin points (eye-tracking)
        eto = np.array([b.ETWoriginX, b.ETWoriginY, b.ETWoriginZ])
        # get just the index of them
        h_idx = h.index.tolist()
        # if there is at least one element in HON
        if len(h_idx) > 0:
            distances = []
            for i in range(len(h)):  # for each row (object name)
                cur = h.iloc[i]  # current row
                # hit position on object as array (point in 3D space)
                hpoo = np.array([cur.HPOOX, cur.HPOOY, cur.HPOOZ])
                # calculate distance from ET origin
                distance = np.linalg.norm(hpoo - eto)
                distances.append(distance)  # store it
            if len(distances) > 0:
                count = 0
                found = True
                # closest distance
                closest = sorted(set(distances))[count]
                idx = distances.index(closest)  # index of closest distance
                if (isinstance(idx, str)) and "BodyCube" in h.iloc[idx].HON:
                    found = False
                    count = +1
                    # if the body cube is the first seen object --> here use set for length since we might have two identical hits
                    if count < len(set(distances)):
                        closest = sorted(set(distances))[count]
                        idx = distances.index(
                            closest
                        )  # index of closest distance
                        if "Head" in h.iloc[idx].HON:
                            count += 1
                            if count < len(distances):
                                closest = sorted(set(distances))[count]
                                idx = distances.index(
                                    closest
                                )  # index of closest distance
                                found = True
                        else:
                            found = True
                elif (isinstance(idx, str)) and "Head" in h.iloc[idx].HON:
                    found = False
                    count = +1
                    if count < len(distances):
                        closest = sorted(set(distances))[count]
                        idx = distances.index(
                            closest
                        )  # index of closest distance
                        if "BodyCube" in h.iloc[idx].HON:
                            count += 1
                            if count < len(distances):
                                closest = sorted(set(distances))[count]
                                idx = distances.index(
                                    closest
                                )  # index of closest distance
                                found = True
                        else:
                            found = True

                if (count + 1) < len(sorted(set(distances))):
                    closest2 = sorted(set(distances))[count + 1]
                    idx2 = distances.index(
                        closest2
                    )  # index of closest distance
                    if (isinstance(idx, str)) and (
                        "NPC" in h.iloc[idx].HON
                        and "face" in h.iloc[idx2].HON
                        and int("".join(filter(str.isdigit, h.iloc[idx].HON)))
                        == int("".join(filter(str.isdigit, h.iloc[idx2].HON)))
                        and abs(closest - closest2) < 1
                    ):
                        closest = closest2
                        idx = idx2
                        found = True
                # if found, store the relevant information
                if found:
                    to_indx = to_indx + [
                        h_idx[idx]
                    ]  # timestamps to use as index later
                    h_dis["row"].append(h_idx[idx])  # store row number (index)
                    h_dis["distance"].append(closest)  # store closest distance
                    dis_sorted["distance"].append(closest)
                    h_sorted.append(h.iloc[idx])

    # generate the dataframe of just the object and the distance
    hits = pd.DataFrame(h_dis)
    hits.to_csv(f"{PATH_PROC}/HitDistance_new_{uid}.csv", index=False)

    # generate df for the distance and the remaining data and then concatenate them and save as csv
    c = pd.DataFrame(dis_sorted, index=to_indx)
    hits_sorted = pd.DataFrame(data=h_sorted, columns=HIT_COLS, index=to_indx)
    hits_sorted = pd.concat([hits_sorted, c], axis=1, join="outer")
    # reorder so that distance is at second position
    hits_sorted = hits_sorted.reindex(columns=HIT_SORT)

    hits_sorted = hits_sorted[~hits_sorted.index.duplicated(keep="first")]
    # hits_sorted = hits_sorted.drop_duplicates(subset ="First Name",keep = False, inplace = True)
    hits_sorted.to_csv(f"{PATH_PROC}/HitsSorted_new_{uid}.csv", index=True)


# stream names to include
INCLUDE = [
    "HitObjectNames",
    "HitObjectPositions",
    "HitPositionOnObjects",
]
# initialize pool of processes according to the available cpu core threads
pool = Pool(processes=CORES, initializer=pbar_fork_hack)

# participants ids
ids = recordings.index.tolist()
idd = ids[:]


# participants progress bar
parts_pbar = tqdm(
    iterable=pool.imap_unordered(func=calculate_hits, iterable=idd),
    total=len(idd),
    desc="📂 participants",
    dynamic_ncols=True,
    bar_format=B_FORMAT,
)

# loop necessary for displaying properly the progressbar with multiprocessing
# source: https://stackoverflow.com/a/40133278
for _ in parts_pbar:
    pass

# close pool instance, no more work to submit
pool.close()
# wait for the worker processes to terminate
pool.join()