# 1 Reading EyeLinks' EDF recordings

In [2]:
# Copyright 2024 The Axon Lab <theaxonlab@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# We support and encourage derived works from this project, please read
# about our expectations at
#
#     https://www.nipreps.org/community/licensing/
# Derived from
#    https://github.com/TheAxonLab/hcph-sops/blob/mkdocs/code/eyetracking/convert.py
#    https://www.axonlab.org/hcph-sops/data-management/edf-to-bids/
# 
# Make sure the python version >=3.7 to support the statement
from __future__ import annotations 
from pathlib import Path
import pandas as pd
import numpy as np
from pyedfread import read_edf
from collections import defaultdict
from itertools import product, groupby
from warnings import warn

In [3]:
DEFAULT_EYE = "right"
DEFAULT_FREQUENCY = 1000
DEFAULT_MODE = "P-CR"
DEFAULT_SCREEN = (0, 800, 0, 600)

# EyeLink calibration coordinates from
# https://www.sr-research.com/calibration-coordinate-calculator/
EYELINK_CALIBRATION_COORDINATES = [
    (400, 300),
    (400, 51),
    (400, 549),
    (48, 300),
    (752, 300),
    (48, 51),
    (752, 51),
    (48, 549),
    (752, 549),
    (224, 176),
    (576, 176),
    (224, 424),
    (576, 424),
]

EYE_CODE_MAP = defaultdict(lambda: "unknown", {"R": "right", "L": "left", "RL": "both"})
EDF2BIDS_COLUMNS = {
    "g": '',
    "p": "pupil",
    "h": "href",
    "r": "raw",
    "fg": "fast",
    "fh": "fast_href",
    "fr": "fast_raw",
}

BIDS_COLUMNS_ORDER = (
    [f"eye{num}_{c}_coordinate" for num, c in product((1, 2), ("x", "y"))]
    + [f"eye{num}_pupil_size" for num in (1, 2)]
    + [f"eye{num}_pupil_{c}_coordinate" for num, c in product((1, 2), ("x", "y"))]
    + [f"eye{num}_fixation" for num in (1, 2)]
    + [f"eye{num}_saccade" for num in (1, 2)]
    + [f"eye{num}_blink" for num in (1, 2)]
    + [f"eye{num}_href_{c}_coordinate" for num, c in product((1, 2), ("x", "y"))]
    + [f"eye{num}_{c}_velocity" for num, c in product((1, 2), ("x", "y"))]
    + [f"eye{num}_href_{c}_velocity" for num, c in product((1, 2), ("x", "y"))]
    + [f"eye{num}_raw_{c}_velocity" for num, c in product((1, 2), ("x", "y"))]
    + [f"fast_{c}_velocity" for c in ("x", "y")]
    + [f"fast_{kind}_{c}_velocity" for kind, c in product(("href", "raw"), ("x", "y"))]
    + [f"screen_ppdeg_{c}_coordinate" for c in ("x", "y")]
    + ["timestamp"]
)


In [4]:
DATA_PATH = Path("D:\\Eye_Dataset\\Sub001\\230928_anatomical_MREYE_study\\ET_EDF")
edf_name = "JB2.EDF"
file_path = str(DATA_PATH / edf_name)
# file_path = "D:\\Eye_Dataset\\Sub001\\230928_anatomical_MREYE_study\\ET_EDF\\Bold_GR4.edf"

print(file_path)
recording, events, messages = read_edf(file_path)
print(messages)
ori_recording = recording
ori_messages = messages

D:\Eye_Dataset\Sub001\230928_anatomical_MREYE_study\ET_EDF\JB2.EDF
       time  trial                                            message
0   1973607     -1  !CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGH...
1   1973608     -1                           !CAL Calibration points:
2   1973608     -1                !CAL -10.5, -49.1         0,      0
3   1973608     -1                !CAL -10.5, -71.3         0,  -2457
4   1973608     -1                !CAL -10.1, -29.2         0,   2457
5   1973608     -1                !CAL -43.8, -49.2     -3474,      0
6   1973608     -1                !CAL  22.8, -51.0      3474,      0
7   1973608     -1  !CAL eye check box: (L,R,T,B)\n\t  -50    29  ...
8   1973608     -1  !CAL href cal range: (L,R,T,B)\n\t-5211  5211 ...
9   1973608     -1  !CAL Cal coeff:(X=a+bx+cy+dxx+eyy,Y=f+gx+goaly...
10  1973608     -1    !CAL Prenormalize: offx, offy = -10.544 -49.144
11  1973608     -1       !CAL Gains: cx:104.313 lx:104.623 rx:104.587
12  1973608     -1     

# 2 Parsing the messages

In [5]:
post_messages = messages.rename(
    columns={c: c.strip() for c in messages.columns.values}
).drop_duplicates()


In [6]:
print(post_messages)

       time  trial                                            message
0   1973607     -1  !CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGH...
1   1973608     -1                           !CAL Calibration points:
2   1973608     -1                !CAL -10.5, -49.1         0,      0
3   1973608     -1                !CAL -10.5, -71.3         0,  -2457
4   1973608     -1                !CAL -10.1, -29.2         0,   2457
5   1973608     -1                !CAL -43.8, -49.2     -3474,      0
6   1973608     -1                !CAL  22.8, -51.0      3474,      0
7   1973608     -1  !CAL eye check box: (L,R,T,B)\n\t  -50    29  ...
8   1973608     -1  !CAL href cal range: (L,R,T,B)\n\t-5211  5211 ...
9   1973608     -1  !CAL Cal coeff:(X=a+bx+cy+dxx+eyy,Y=f+gx+goaly...
10  1973608     -1    !CAL Prenormalize: offx, offy = -10.544 -49.144
11  1973608     -1       !CAL Gains: cx:104.313 lx:104.623 rx:104.587
12  1973608     -1       !CAL Gains: cy:145.928 ty:108.256 by:138.042
13  1973608     -1  

The prefix "!CAL" represents the calibration information. We extract the calibration headers here.

In [7]:
# print(post_messages.info())
# print(post_messages.message)
# print(post_messages.message[0])
# _cal_hdr = post_messages.message.str.startswith("!CAL")

In [8]:
# Extract calibration headers
_cal_hdr = post_messages.message.str.startswith("!CAL")
calibration = post_messages[_cal_hdr]
print("Calibration")
print(calibration)
messages_drop_cal = post_messages.drop(post_messages.index[_cal_hdr])
print("Messages_drop_cal")
print(messages_drop_cal)

Calibration
       time  trial                                            message
0   1973607     -1  !CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGH...
1   1973608     -1                           !CAL Calibration points:
2   1973608     -1                !CAL -10.5, -49.1         0,      0
3   1973608     -1                !CAL -10.5, -71.3         0,  -2457
4   1973608     -1                !CAL -10.1, -29.2         0,   2457
5   1973608     -1                !CAL -43.8, -49.2     -3474,      0
6   1973608     -1                !CAL  22.8, -51.0      3474,      0
7   1973608     -1  !CAL eye check box: (L,R,T,B)\n\t  -50    29  ...
8   1973608     -1  !CAL href cal range: (L,R,T,B)\n\t-5211  5211 ...
9   1973608     -1  !CAL Cal coeff:(X=a+bx+cy+dxx+eyy,Y=f+gx+goaly...
10  1973608     -1    !CAL Prenormalize: offx, offy = -10.544 -49.144
11  1973608     -1       !CAL Gains: cx:104.313 lx:104.623 rx:104.587
12  1973608     -1       !CAL Gains: cy:145.928 ty:108.256 by:138.042
13  1973

In [9]:
# # I don't know what is first trigger and last trigger
# message_first_trigger =  'START'
# message_last_trigger = 'STOP'
# start_rows = post_messages.message.str.contains(
#     message_first_trigger, case=False, regex=True
# )

# stop_rows = post_messages.message.str.contains(
#     message_last_trigger, case=False, regex=True
# )


# # # Pick the LAST of the start messages
# metadata["StartTime"] = (
#     int(messages[start_rows].time.values[-1])
#     if start_rows.any()
#     else None
# )

# # # Pick the FIRST of the stop messages
# metadata["StopTime"] = (
#     int(messages[stop_rows].time.values[0])
#     if stop_rows.any()
#     else None
# )

# # # Drop start and stop messages from messages dataframe
# # messages = messages.loc[~start_rows & ~stop_rows, :]


Extracting basic metadata. Next, we extract basic metadata: the eye being sampled, the ET approach, and the sampling frequency. These are parsed from the following message: 

!MODE RECORD CR 1000 2 0 R

In [10]:
import re
metadata = {
    'StopTime': None,
    'StartTime': None
}

In [11]:
# !MODE RECORD CR 1000 2 0 R
mode_record = post_messages.message.str.startswith("!MODE RECORD")
print(f"mode_record: {mode_record}")
# My compliment
# DEFAULT_MODE = 'CR'
# DEFAULT_EYE = 'R'
# what is the default freq? default mode? default eye?
meta_record = {
    "freq": DEFAULT_FREQUENCY,
    "mode": DEFAULT_MODE,
    "eye": DEFAULT_EYE,
}

EYE_CODE_MAP = {
    "L": "left",
    "R": "right"
}

if mode_record.any():
    try:
        meta_record = re.match(
            r"\!MODE RECORD (?P<mode>\w+) (?P<freq>\d+) \d \d (?P<eye>[RL]+)",
            messages[mode_record].message.iloc[-1].strip(),
        ).groupdict()

        meta_record["eye"] = EYE_CODE_MAP[meta_record["eye"]]
        meta_record["mode"] = (
            "P-CR" if meta_record["mode"] == "CR" else meta_record["mode"]
        )
    except AttributeError:
        warn(
            "Error extracting !MODE RECORD message, "
            "using default frequency, mode, and eye"
        )
    finally:
        messages = messages.loc[~mode_record]

eye = (
    ("right", "left") if meta_record["eye"] == "both" else (meta_record["eye"],)
)

metadata["SamplingFrequency"] = int(meta_record["freq"])
metadata["EyeTrackingMethod"] = meta_record["mode"]
metadata["RecordedEye"] = meta_record["eye"]

mode_record: 0     False
1     False
2     False
3     False
4     False
5     False
6     False
7     False
8     False
9     False
10    False
11    False
12    False
13    False
14    False
15    False
16    False
17    False
18    False
19    False
20    False
21    False
22    False
23    False
24    False
25    False
26    False
27    False
28    False
29    False
30    False
31    False
32    False
33    False
34     True
Name: message, dtype: bool


Extracting screen parameters. We then parse the GAZE_COORDS message, 

with format GAZE_COORDS 0.00 0.00 800.00 600.00:

In [12]:
# Extract GAZE_COORDS message signaling start of recording
gaze_msg = post_messages.message.str.startswith("GAZE_COORDS")
# print(f"gaze message: {gaze_msg}")


metadata["ScreenAOIDefinition"] = [
    "square",
    DEFAULT_SCREEN,
]

if gaze_msg.any():
    try:
        gaze_record = re.match(
            r"GAZE_COORDS (\d+\.\d+) (\d+\.\d+) (\d+\.\d+) (\d+\.\d+)",
            post_messages[gaze_msg].message.iloc[-1].strip(),
        ).groups()
        metadata["ScreenAOIDefinition"][1] = [
            int(round(float(gaze_record[0]))),
            int(round(float(gaze_record[2]))),
            int(round(float(gaze_record[1]))),
            int(round(float(gaze_record[3]))),
        ]
    except AttributeError:
        warn("Error extracting GAZE_COORDS")
    finally:
        post_messages = post_messages.loc[~gaze_msg]

Extracting parameters of the pupil fit model. Next, we look into messages containing ELCL_PROC and ELCL_EFIT_PARAMS, which specify the pupil fit. The format of these messages is as follows:

ELCL_PROC ELLIPSE (5)

ELCL_EFIT_PARAMS 1.01 4.00  0.15 0.05  0.65 0.65  0.00 0.00 0.30

In [13]:
# Extract ELCL_PROC AND ELCL_EFIT_PARAMS to extract pupil fit method
pupilfit_msg = post_messages.message.str.startswith("ELCL_PROC")
if pupilfit_msg.any():
    try:
        pupilfit_method = [
            val
            for val in post_messages[pupilfit_msg]
            .message.iloc[-1]
            .strip()
            .split(" ")[1:]
            if val
        ]
        metadata["PupilFitMethod"] = pupilfit_method[0].lower()
        metadata["PupilFitMethodNumberOfParameters"] = int(
            pupilfit_method[1].strip("(").strip(")")
        )
    except AttributeError:
        warn("Error extracting ELCL_PROC (pupil fitting method)")
    finally:
        post_messages = post_messages.loc[~pupilfit_msg]

In [14]:
pupilfit_msg_params = post_messages.message.str.startswith("ELCL_EFIT_PARAMS")

In [15]:
if pupilfit_msg_params.any():
    rows = post_messages[pupilfit_msg_params]
    row = rows.message.values[-1].strip().split(" ")[1:]
    try:
        metadata["PupilFitParameters"] = [
            tuple(float(val) for val in vals)
            for k, vals in groupby(row, key=bool)
            if k
        ]
    except AttributeError:
        warn("Error extracting ELCL_EFIT_PARAMS (pupil fitting parameters)")
    finally:
        print('finally')
        post_messages = post_messages.loc[~pupilfit_msg_params]

finally


Calibration validation. If calibration was performed, most likely a validation procedure was executed next. 

These messages take the format VALIDATE R 4POINT 4 RIGHT at 752,300 OFFSET 0.35 deg. -8.7,-3.8 pix.

In [16]:
# Extract VALIDATE messages for a calibration validation
validation_msg = post_messages.message.str.startswith("VALIDATE")

if validation_msg.any():
    metadata["ValidationPosition"] = []
    metadata["ValidationErrors"] = []

for i_row, validate_row in enumerate(post_messages[validation_msg].message.values):
    prefix, suffix = validate_row.split("OFFSET")
    validation_eye = (
        f"eye{eye.index('right') + 1}"
        if "RIGHT" in prefix
        else f"eye{eye.index('left') + 1}"
    )
    validation_coords = [
        int(val.strip())
        for val in prefix.rsplit("at", 1)[-1].split(",")
        if val.strip()
    ]
    metadata["ValidationPosition"].append(
        [validation_eye, validation_coords]
    )

    validate_values = [
        float(val)
        for val in re.match(
            r"(-?\d+\.\d+) deg\.\s+(-?\d+\.\d+),(-?\d+\.\d+) pix\.",
            suffix.strip(),
        ).groups()
    ]

    metadata["ValidationErrors"].append(
        (validation_eye, validate_values[0], tuple(validate_values[1:]))
    )
post_messages = post_messages.loc[~validation_msg]

In [17]:
print(metadata)
print(post_messages)

{'StopTime': None, 'StartTime': None, 'SamplingFrequency': 1000, 'EyeTrackingMethod': 'P-CR', 'RecordedEye': 'right', 'ScreenAOIDefinition': ['square', [0, 800, 0, 600]], 'PupilFitMethod': 'ellipse', 'PupilFitMethodNumberOfParameters': 5, 'PupilFitParameters': [(1.01, 4.0), (0.15, 0.05), (0.65, 0.65), (0.0, 0.0, 0.3)], 'ValidationPosition': [['eye1', [400, 300]], ['eye1', [400, 51]], ['eye1', [400, 549]], ['eye1', [48, 300]], ['eye1', [752, 300]]], 'ValidationErrors': [('eye1', 0.6, (-14.6, -7.0)), ('eye1', 0.34, (-8.6, -3.0)), ('eye1', 0.21, (-3.5, -4.4)), ('eye1', 0.85, (-14.4, -17.8)), ('eye1', 0.33, (5.7, 6.8))]}
       time  trial                                            message
0   1973607     -1  !CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGH...
1   1973608     -1                           !CAL Calibration points:
2   1973608     -1                !CAL -10.5, -49.1         0,      0
3   1973608     -1                !CAL -10.5, -71.3         0,  -2457
4   1973608     -1       

Extracting final bits of metadata. Finally, we parse the last of the THRESHOLDS messages.

In [18]:
# Extract THRESHOLDS messages prior recording and process last
thresholds_msg = post_messages.message.str.startswith("THRESHOLDS")
if thresholds_msg.any():
    metadata["PupilThreshold"] = [None] * len(eye)
    metadata["CornealReflectionThreshold"] = [None] * len(eye)
    thresholds_chunks = (
        post_messages[thresholds_msg].message.iloc[-1].strip().split(" ")[1:]
    )
    eye_index = eye.index(EYE_CODE_MAP[thresholds_chunks[0]])
    metadata["PupilThreshold"][eye_index] = int(thresholds_chunks[-2])
    metadata["CornealReflectionThreshold"][eye_index] = int(
        thresholds_chunks[-1]
    )
post_messages = post_messages.loc[~thresholds_msg]

In [19]:
print(post_messages)

       time  trial                                            message
0   1973607     -1  !CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGH...
1   1973608     -1                           !CAL Calibration points:
2   1973608     -1                !CAL -10.5, -49.1         0,      0
3   1973608     -1                !CAL -10.5, -71.3         0,  -2457
4   1973608     -1                !CAL -10.1, -29.2         0,   2457
5   1973608     -1                !CAL -43.8, -49.2     -3474,      0
6   1973608     -1                !CAL  22.8, -51.0      3474,      0
7   1973608     -1  !CAL eye check box: (L,R,T,B)\n\t  -50    29  ...
8   1973608     -1  !CAL href cal range: (L,R,T,B)\n\t-5211  5211 ...
9   1973608     -1  !CAL Cal coeff:(X=a+bx+cy+dxx+eyy,Y=f+gx+goaly...
10  1973608     -1    !CAL Prenormalize: offx, offy = -10.544 -49.144
11  1973608     -1       !CAL Gains: cx:104.313 lx:104.623 rx:104.587
12  1973608     -1       !CAL Gains: cy:145.928 ty:108.256 by:138.042
13  1973608     -1  

Flush the remaining messages as a metadata entry

In [20]:
# Consume the remainder of messages
if not post_messages.empty:
    metadata["LoggedMessages"] = [
        (int(msg_timestamp), msg.strip())
        for msg_timestamp, msg in post_messages[["time", "message"]].values
    ]

In [21]:
print(metadata)

{'StopTime': None, 'StartTime': None, 'SamplingFrequency': 1000, 'EyeTrackingMethod': 'P-CR', 'RecordedEye': 'right', 'ScreenAOIDefinition': ['square', [0, 800, 0, 600]], 'PupilFitMethod': 'ellipse', 'PupilFitMethodNumberOfParameters': 5, 'PupilFitParameters': [(1.01, 4.0), (0.15, 0.05), (0.65, 0.65), (0.0, 0.0, 0.3)], 'ValidationPosition': [['eye1', [400, 300]], ['eye1', [400, 51]], ['eye1', [400, 549]], ['eye1', [48, 300]], ['eye1', [752, 300]]], 'ValidationErrors': [('eye1', 0.6, (-14.6, -7.0)), ('eye1', 0.34, (-8.6, -3.0)), ('eye1', 0.21, (-3.5, -4.4)), ('eye1', 0.85, (-14.4, -17.8)), ('eye1', 0.33, (5.7, 6.8))], 'PupilThreshold': [68], 'CornealReflectionThreshold': [179], 'LoggedMessages': [(1973607, '!CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGHT: <<<<<<<<<'), (1973608, '!CAL Calibration points:'), (1973608, '!CAL -10.5, -49.1         0,      0'), (1973608, '!CAL -10.5, -71.3         0,  -2457'), (1973608, '!CAL -10.1, -29.2         0,   2457'), (1973608, '!CAL -43.8, -49.2     

# 3 Parsing the recording

In [22]:
# Normalize timestamps (should be int and strictly positive)
recording = ori_recording.astype({"time": int})
recording = recording[recording["time"] > 0]

In [23]:
columns = recording.columns
print("Columns:")
print(columns)

Columns:
Index(['time', 'px_left', 'px_right', 'py_left', 'py_right', 'hx_left',
       'hx_right', 'hy_left', 'hy_right', 'pa_left', 'pa_right', 'gx_left',
       'gx_right', 'gy_left', 'gy_right', 'rx', 'ry', 'gxvel_left',
       'gxvel_right', 'gyvel_left', 'gyvel_right', 'hxvel_left', 'hxvel_right',
       'hyvel_left', 'hyvel_right', 'rxvel_left', 'rxvel_right', 'ryvel_left',
       'ryvel_right', 'fgxvel', 'fgyvel', 'fhxvel', 'fhyvel', 'frxvel',
       'fryvel', 'flags', 'input', 'buttons', 'htype', 'errors'],
      dtype='object')


In [24]:
recording = recording.rename(
    columns={
        # Fix buggy header names generated by pyedfread
#         "fhxyvel": "fhxvel",
#         "frxyvel": "frxvel",
        # Normalize weird header names generated by pyedfread
        "rx": "screen_ppdeg_x_coordinate",
        "ry": "screen_ppdeg_y_coordinate",
        # Convert some BIDS columns
        "time": "timestamp",
    }
)

# Split extra columns from the dataframe
extra = recording[["flags", "input", "htype"]]
recording = recording.drop(columns=["flags", "input", "htype"])

# Remove columns that are always very close to zero
recording = recording.loc[:, (recording.abs() > 1e-8).any(axis=0)]
# Remove columns that are always 1e8 or more
recording = recording.loc[:, (recording.abs() < 1e8).any(axis=0)]
# Replace unreasonably high values with NaNs
recording = recording.replace({1e8: np.nan})

In [25]:
print(f'The eye we take care of {eye}')
remove_eye = set(("left", "right")) - set(eye)
print(f'The eye to be removed: {remove_eye}')

The eye we take care of ('right',)
The eye to be removed: {'left'}


In [26]:
# Drop one eye's columns if not interested in "both"
if remove_eye:
    remove_eye = remove_eye.pop()  # Drop set decoration
    #remove the corresponding column of the removed eye
    recording = recording.reindex(
        columns=[c for c in recording.columns if remove_eye not in c]
    )
    
columns = recording.columns
print("Columns:")
print(columns)

Columns:
Index(['timestamp', 'px_right', 'py_right', 'hx_right', 'hy_right', 'pa_right',
       'gx_right', 'gy_right', 'screen_ppdeg_x_coordinate',
       'screen_ppdeg_y_coordinate', 'gxvel_right', 'hxvel_right',
       'rxvel_right', 'frxvel'],
      dtype='object')


In [27]:
# Clean-up pupil size and gaze position. 
# These are the parameters we most likely we care for, so special curation is applied:
# Drop one eye's columns if not interested in "both"
# Complement screen resolution
screen_resolution = [800, 600]
columns = recording.columns
print(f'columns: {columns}')

for eyenum, eyename in enumerate(eye):
    # Clean-up implausible values for pupil area (pa)
    print(f'eyenum {eyenum}')
    print(f'eyename {eyename}')
    recording.loc[
        recording[f"pa_{eyename}"] < 1, f"pa_{eyename}"
    ] = np.nan
    recording = recording.rename(
        columns={f"pa_{eyename}": f"eye{eyenum + 1}_pupil_size"}
    )

    # Clean-up implausible values for gaze x position
    recording.loc[
        (recording[f"gx_{eyename}"] < 0)
        | (recording[f"gx_{eyename}"] > screen_resolution[0]),
        f"gx_{eyename}",
    ] = np.nan
    # Clean-up implausible values for gaze y position
    recording.loc[
        (recording[f"gy_{eyename}"] <= 0)
        | (recording[f"gy_{eyename}"] > screen_resolution[1]),
        f"gy_{eyename}",
    ] = np.nan

columns: Index(['timestamp', 'px_right', 'py_right', 'hx_right', 'hy_right', 'pa_right',
       'gx_right', 'gy_right', 'screen_ppdeg_x_coordinate',
       'screen_ppdeg_y_coordinate', 'gxvel_right', 'hxvel_right',
       'rxvel_right', 'frxvel'],
      dtype='object')
eyenum 0
eyename right


Munging columns to comply with BIDS. 

At this point, the dataframe is almost ready for writing out as BIDS.

In [59]:
# Interpolate BIDS column names
columns = list(
    set(recording.columns)
    - set(
        (
            "timestamp",
            "screen_ppdeg_x_coordinate",
            "screen_ppdeg_y_coordinate",
            "eye1_pupil_size",
            "eye2_pupil_size",
        )
    )
)
print(f'columns: {columns}')



columns: ['eye1_x_velocity', 'eye1_pupil_x_coordinate', 'eye1_y_coordinate', 'fast_raw_x_velocity', 'eye1_href_y_coordinate', 'eye1_fixation', 'eye1_href_x_coordinate', 'eye1_saccade', 'eye1_href_x_velocity', 'eye1_raw_x_velocity', 'eye1_blink', 'eye1_x_coordinate', 'eye1_pupil_y_coordinate']
columns: Index(['eye1_x_coordinate', 'eye1_y_coordinate', 'eye1_pupil_size',
       'eye1_pupil_x_coordinate', 'eye1_pupil_y_coordinate',
       'eye1_href_x_coordinate', 'eye1_href_y_coordinate', 'eye1_x_velocity',
       'eye1_href_x_velocity', 'eye1_raw_x_velocity', 'fast_raw_x_velocity',
       'screen_ppdeg_x_coordinate', 'screen_ppdeg_y_coordinate', 'timestamp',
       'eye1_fixation', 'eye1_saccade', 'eye1_blink'],
      dtype='object')


In [29]:
bids_columns = []
for eyenum, eyename in enumerate(eye):
    for name in columns:
        colprefix = f"eye{eyenum + 1}" if name.endswith(f"_{eyename}") else ""
        _newname = name.split("_")[0]
        print(f'_newname 1.0:{_newname}')
        _newname = re.sub(r"([xy])$", r"_\1_coordinate", _newname)
        print(f'coordinate: {_newname}')
        _newname = re.sub(r"([xy])vel$", r"_\1_velocity", _newname)
        print(f'velocity: {_newname}')
        _newname = _newname.split("_", 1)
        _newname[0] = EDF2BIDS_COLUMNS[_newname[0]]
        _newname.insert(0, colprefix)
        bids_columns.append("_".join((_n for _n in _newname if _n)))

# Rename columns to be BIDS-compliant
recording = recording.rename(columns=dict(zip(columns, bids_columns)))

# Reorder columns to render nicely (tracking first, pupil size after)
columns = sorted(
    set(recording.columns.values).intersection(BIDS_COLUMNS_ORDER),
    key=lambda entry: BIDS_COLUMNS_ORDER.index(entry),
)
columns += [c for c in recording.columns.values if c not in columns]
recording = recording.reindex(columns=columns)
print(f'Final column: {recording.columns}')

_newname 1.0:hxvel
coordinate: hxvel
velocity: h_x_velocity
_newname 1.0:rxvel
coordinate: rxvel
velocity: r_x_velocity
_newname 1.0:px
coordinate: p_x_coordinate
velocity: p_x_coordinate
_newname 1.0:hy
coordinate: h_y_coordinate
velocity: h_y_coordinate
_newname 1.0:frxvel
coordinate: frxvel
velocity: fr_x_velocity
_newname 1.0:gx
coordinate: g_x_coordinate
velocity: g_x_coordinate
_newname 1.0:hx
coordinate: h_x_coordinate
velocity: h_x_coordinate
_newname 1.0:gy
coordinate: g_y_coordinate
velocity: g_y_coordinate
_newname 1.0:gxvel
coordinate: gxvel
velocity: g_x_velocity
_newname 1.0:py
coordinate: p_y_coordinate
velocity: p_y_coordinate
Final column: Index(['eye1_x_coordinate', 'eye1_y_coordinate', 'eye1_pupil_size',
       'eye1_pupil_x_coordinate', 'eye1_pupil_y_coordinate',
       'eye1_href_x_coordinate', 'eye1_href_y_coordinate', 'eye1_x_velocity',
       'eye1_href_x_velocity', 'eye1_raw_x_velocity', 'fast_raw_x_velocity',
       'screen_ppdeg_x_coordinate', 'screen_ppdeg_y

Parsing the calibration messages

In [30]:
print(calibration)

       time  trial                                            message
0   1973607     -1  !CAL \n>>>>>>> CALIBRATION (HV5,P-CR) FOR RIGH...
1   1973608     -1                           !CAL Calibration points:
2   1973608     -1                !CAL -10.5, -49.1         0,      0
3   1973608     -1                !CAL -10.5, -71.3         0,  -2457
4   1973608     -1                !CAL -10.1, -29.2         0,   2457
5   1973608     -1                !CAL -43.8, -49.2     -3474,      0
6   1973608     -1                !CAL  22.8, -51.0      3474,      0
7   1973608     -1  !CAL eye check box: (L,R,T,B)\n\t  -50    29  ...
8   1973608     -1  !CAL href cal range: (L,R,T,B)\n\t-5211  5211 ...
9   1973608     -1  !CAL Cal coeff:(X=a+bx+cy+dxx+eyy,Y=f+gx+goaly...
10  1973608     -1    !CAL Prenormalize: offx, offy = -10.544 -49.144
11  1973608     -1       !CAL Gains: cx:104.313 lx:104.623 rx:104.587
12  1973608     -1       !CAL Gains: cy:145.928 ty:108.256 by:138.042
13  1973608     -1  

In [31]:
# Parse calibration metadata
from warnings import warn

metadata["CalibrationCount"] = 0
if not calibration.empty:
    warn("Calibration of more than one eye is not implemented")
    calibration.message = calibration.message.str.replace("!CAL", "")
    calibration.message = calibration.message.str.strip()

    metadata["CalibrationLog"] = list(
        zip(
            calibration.time.values.astype(int),
            calibration.message.values,
        )
    )

    calibrations_msg = calibration.message.str.startswith(
        "VALIDATION"
    ) & calibration.message.str.contains("ERROR")
    metadata["CalibrationCount"] = calibrations_msg.sum()

    calibration_last = calibration.index[calibrations_msg][-1]
    try:
        meta_calib = re.match(
            r"VALIDATION (?P<ctype>[\w\d]+) (?P<eyeid>[RL]+) (?P<eye>RIGHT|LEFT) "
            r"(?P<result>\w+) ERROR (?P<avg>-?\d+\.\d+) avg\. (?P<max>-?\d+\.\d+) max\s+"
            r"OFFSET (?P<offsetdeg>-?\d+\.\d+) deg\. "
            r"(?P<offsetxpix>-?\d+\.\d+),(?P<offsetypix>-?\d+\.\d+) pix\.",
            calibration.loc[calibration_last, "message"].strip(),
        ).groupdict()

        metadata["CalibrationType"] = meta_calib["ctype"]
        metadata["AverageCalibrationError"] = [float(meta_calib["avg"])]
        metadata["MaximalCalibrationError"] = [float(meta_calib["max"])]
        metadata["CalibrationResultQuality"] = [meta_calib["result"]]
        metadata["CalibrationResultOffset"] = [
            float(meta_calib["offsetdeg"]),
            (float(meta_calib["offsetxpix"]), float(meta_calib["offsetypix"])),
        ]
        metadata["CalibrationResultOffsetUnits"] = ["deg", "pixels"]
    except AttributeError:
        warn("Calibration data found but unsuccessfully parsed for results")

  


# 4 Parsing the events dataframe

         eye1_x_coordinate  eye1_y_coordinate  eye1_pupil_size  \
0                      NaN                NaN              NaN   
1                      NaN                NaN              NaN   
2                      NaN                NaN              NaN   
3                      NaN                NaN              NaN   
4                      NaN                NaN              NaN   
...                    ...                ...              ...   
1106532         381.500000         296.299988           2185.0   
1106533         381.600006         296.899994           2181.0   
1106534         381.700012         296.899994           2180.0   
1106535         381.299988         297.100006           2183.0   
1106536         380.799988         298.000000           2192.0   

         eye1_pupil_x_coordinate  eye1_pupil_y_coordinate  \
0                       -32768.0                 -32768.0   
1                       -32768.0                 -32768.0   
2                       

In [33]:
# Process events: first generate empty columns
# Three different events here
# Assign the value 1 to the different events
recording["eye1_fixation"] = 0
recording["eye1_saccade"] = 0
recording["eye1_blink"] = 0

# Add fixations
for _, fixation_event in events[
    events["type"] == "fixation"
].iterrows():
    recording.loc[
        (recording["timestamp"] >= fixation_event["start"])
        & (recording["timestamp"] <= fixation_event["end"]),
        "eye1_fixation",
    ] = 1

# Add saccades, and blinks, which are a sub-event of saccades
for _, saccade_event in events[
    events["type"] == "saccade"
].iterrows():
    recording.loc[
        (recording["timestamp"] >= saccade_event["start"])
        & (recording["timestamp"] <= saccade_event["end"]),
        "eye1_saccade",
    ] = 1

    if saccade_event["blink"] == 1:
        recording.loc[
            (recording["timestamp"] >= saccade_event["start"])
            & (recording["timestamp"] <= saccade_event["end"]),
            "eye1_blink",
        ] = 1

In [34]:
print(recording)

         eye1_x_coordinate  eye1_y_coordinate  eye1_pupil_size  \
0                      NaN                NaN              NaN   
1                      NaN                NaN              NaN   
2                      NaN                NaN              NaN   
3                      NaN                NaN              NaN   
4                      NaN                NaN              NaN   
...                    ...                ...              ...   
1106532         381.500000         296.299988           2185.0   
1106533         381.600006         296.899994           2181.0   
1106534         381.700012         296.899994           2180.0   
1106535         381.299988         297.100006           2183.0   
1106536         380.799988         298.000000           2192.0   

         eye1_pupil_x_coordinate  eye1_pupil_y_coordinate  \
0                       -32768.0                 -32768.0   
1                       -32768.0                 -32768.0   
2                       

Writing the data into the BIDS structure

In [56]:
class EyeTrackingRun:
    """
    Class representing an instance of eye tracking data.

    Examples
    --------
    >>> et_run = EyeTrackingRun(
    ...     recording=recording_df,
    ...     events=events_df,
    ...     messages=messages_df,
    ...     message_first_trigger="start",
    ...     message_last_trigger="end",
    ...     metadata={"MyMetadata": "value"},
    ... )

    """

    def __init__(
        self,
        recording: pd.DataFrame,
        events: pd.DataFrame,
        messages: pd.DataFrame,
        message_first_trigger: str,
        message_last_trigger: str,
        metadata: dict | None = None,
    ) -> None:
        """
        Initialize EyeTrackingRun instance.

        Parameters
        ----------
        recording : pd.DataFrame
            DataFrame containing eye tracking recording.
        events : pd.DataFrame
            DataFrame containing eye tracking events.
        messages : pd.DataFrame
            DataFrame containing eye tracking messages.
        message_first_trigger : str
            Message body that signals the start of the experiment run.
        message_last_trigger : str
            Message body that signals the end of the experiment run.
        metadata : dict
            A dictionary to bootstrap the metadata (e.g., with defaults).

        Notes
        -----
        This method initializes the EyeTrackingRun instance with the provided parameters.

        """
        self.recording = recording
        self.events = events
        self.metadata = metadata or {}

        # Messages may have headers ending with space and drop duplicate rows
        messages = messages.rename(
            columns={c: c.strip() for c in messages.columns.values}
        ).drop_duplicates()

        # Extract calibration headers
        _cal_hdr = messages.message.str.startswith("!CAL")
        calibration = messages[_cal_hdr]
        messages = messages.drop(messages.index[_cal_hdr])

        # Find Start time
        start_rows = messages.message.str.contains(
            message_first_trigger, case=False, regex=True
        )
        stop_rows = messages.message.str.contains(
            message_last_trigger, case=False, regex=True
        )

        # Pick the LAST of the start messages
        self.metadata["StartTimestamp"] = (
            int(messages[start_rows].time.values[-1])
            if start_rows.any()
            else None
        )

        # Pick the FIRST of the stop messages
        self.metadata["StopTimestamp"] = (
            int(messages[stop_rows].time.values[0]) if stop_rows.any() else None
        )

        # Drop start and stop messages from messages dataframe
        messages = messages.loc[~start_rows & ~stop_rows, :]

        # Extract !MODE RECORD message signaling start of recording
        mode_record = messages.message.str.startswith("!MODE RECORD")

        meta_record = {
            "freq": DEFAULT_FREQUENCY,
            "mode": DEFAULT_MODE,
            "eye": DEFAULT_EYE,
        }

        if mode_record.any():
            try:
                meta_record = re.match(
                    r"\!MODE RECORD (?P<mode>\w+) (?P<freq>\d+) \d \d (?P<eye>[RL]+)",
                    messages[mode_record].message.iloc[-1].strip(),
                ).groupdict()

                meta_record["eye"] = EYE_CODE_MAP[meta_record["eye"]]
                meta_record["mode"] = (
                    "P-CR" if meta_record["mode"] == "CR" else meta_record["mode"]
                )
            except AttributeError:
                warn(
                    "Error extracting !MODE RECORD message, "
                    "using default frequency, mode, and eye"
                )
            finally:
                messages = messages.loc[~mode_record]

        self.eye = (
            ("right", "left") if meta_record["eye"] == "both" else (meta_record["eye"],)
        )
        num_recordings = len(self.eye)

        if num_recordings > 1:
            raise NotImplementedError("This script only supports one eye")

        self.metadata["SamplingFrequency"] = float(meta_record["freq"])
        self.metadata["EyeTrackingMethod"] = meta_record["mode"]
        self.metadata["RecordedEye"] = meta_record["eye"].lower()

        # Extract GAZE_COORDS message signaling start of recording
        gaze_msg = messages.message.str.startswith("GAZE_COORDS")

        self.metadata["ScreenAOIDefinition"] = [
            "square",
            DEFAULT_SCREEN,
        ]
        if gaze_msg.any():
            try:
                gaze_record = re.match(
                    r"GAZE_COORDS (\d+\.\d+) (\d+\.\d+) (\d+\.\d+) (\d+\.\d+)",
                    messages[gaze_msg].message.iloc[-1].strip(),
                ).groups()
                self.metadata["ScreenAOIDefinition"][1] = [
                    int(round(float(gaze_record[0]))),
                    int(round(float(gaze_record[2]))),
                    int(round(float(gaze_record[1]))),
                    int(round(float(gaze_record[3]))),
                ]
            except AttributeError:
                warn("Error extracting GAZE_COORDS")
            finally:
                messages = messages.loc[~gaze_msg]

        self.screen_resolution = self.metadata["ScreenAOIDefinition"][1][2:]

        # Extract ELCL_PROC AND ELCL_EFIT_PARAMS to extract pupil fit method
        pupilfit_msg = messages.message.str.startswith("ELCL_PROC")

        if pupilfit_msg.any():
            try:
                pupilfit_method = [
                    val
                    for val in messages[pupilfit_msg]
                    .message.iloc[-1]
                    .strip()
                    .split(" ")[1:]
                    if val
                ]
                self.metadata["PupilFitMethod"] = pupilfit_method[0].lower()
                self.metadata["PupilFitMethodNumberOfParameters"] = int(
                    pupilfit_method[1].strip("(").strip(")")
                )
            except AttributeError:
                warn("Error extracting ELCL_PROC (pupil fitting method)")
            finally:
                messages = messages.loc[~pupilfit_msg]

        pupilfit_msg_params = messages.message.str.startswith("ELCL_EFIT_PARAMS")
        if pupilfit_msg_params.any():
            rows = messages[pupilfit_msg_params]
            row = rows.message.values[-1].strip().split(" ")[1:]
            try:
                self.metadata["PupilFitParameters"] = [
                    tuple(float(val) for val in vals)
                    for k, vals in groupby(row, key=bool)
                    if k
                ]
            except AttributeError:
                warn("Error extracting ELCL_EFIT_PARAMS (pupil fitting parameters)")
            finally:
                messages = messages.loc[~pupilfit_msg_params]

        # Extract VALIDATE messages for a calibration validation
        validation_msg = messages.message.str.startswith("VALIDATE")

        if validation_msg.any():
            self.metadata["ValidationPosition"] = []
            self.metadata["ValidationErrors"] = []

        for i_row, validate_row in enumerate(messages[validation_msg].message.values):
            prefix, suffix = validate_row.split("OFFSET")
            # validation_eye = (
            #     f"eye{self.eye.index('right') + 1}"
            #     if "RIGHT" in prefix
            #     else f"eye{self.eye.index('left') + 1}"
            # )
            validation_coords = [
                int(val.strip())
                for val in prefix.rsplit("at", 1)[-1].split(",")
                if val.strip()
            ]
            self.metadata["ValidationPosition"].append(validation_coords)

            validate_values = [
                float(val)
                for val in re.match(
                    r"(-?\d+\.\d+) deg\.\s+(-?\d+\.\d+),(-?\d+\.\d+) pix\.",
                    suffix.strip(),
                ).groups()
            ]

            self.metadata["ValidationErrors"].append(
                (validate_values[0], tuple(validate_values[1:]))
            )
        messages = messages.loc[~validation_msg]

        # Extract THRESHOLDS messages prior recording and process last
        thresholds_msg = messages.message.str.startswith("THRESHOLDS")
        if thresholds_msg.any():
            # self.metadata["PupilThreshold"] = [None] * len(self.eye)
            # self.metadata["CornealReflectionThreshold"] = [None] * len(self.eye)
            thresholds_chunks = (
                messages[thresholds_msg].message.iloc[-1].strip().split(" ")[1:]
            )
            # eye_index = self.eye.index(EYE_CODE_MAP[thresholds_chunks[0]])
            self.metadata["PupilThreshold"] = int(thresholds_chunks[-2])
            self.metadata["CornealReflectionThreshold"] = int(thresholds_chunks[-1])
        messages = messages.loc[~thresholds_msg]

        # Consume the remainder of messages
        if not messages.empty:
            self.metadata["LoggedMessages"] = [
                (int(msg_timestamp), msg.strip())
                for msg_timestamp, msg in messages[["time", "message"]].values
            ]

        # Normalize timestamps (should be int and strictly positive)
        self.recording = self.recording[
            self.recording["timestamp"] > self.recording.loc[0, "timestamp"]
        ]
        self.recording = self.recording.astype({"timestamp": int})

        self.recording = self.recording.rename(
            columns={
                # Fix buggy header names generated by pyedfread
                "fhxyvel": "fhxvel",
                "frxyvel": "frxvel",
                # Normalize weird header names generated by pyedfread
                "rx": "screen_ppdeg_x_coordinate",
                "ry": "screen_ppdeg_y_coordinate",
                # Convert some BIDS columns
                "time": "timestamp",
            }
        )

        # Split extra columns from the dataframe
        extra_columns = ["flags", "input", "htype"]
        presence = {column: (column in self.recording.columns) for column in extra_columns}
        for _k, is_present in presence.items():
            print(f'is_present: {is_present}')
            if is_present:
                print('here')
                self.extra = self.recording[["flags", "input", "htype"]]
                self.recording = self.recording.drop(columns=["flags", "input", "htype"])
            break

        # Remove columns that are always very close to zero
        self.recording = self.recording.loc[
            :, (self.recording.abs() > 1e-8).any(axis=0)
        ]
        # Remove columns that are always 1e8 or more
        self.recording = self.recording.loc[:, (self.recording.abs() < 1e8).any(axis=0)]
        # Replace unreasonably high values with NaNs
        self.recording = self.recording.replace({1e8: np.nan})

        # Drop one eye's columns if not interested in "both"
        remove_eye = set(("left", "right")) - set(self.eye)
        if remove_eye:
            remove_eye = remove_eye.pop()  # Drop set decoration
            self.recording = self.recording.reindex(
                columns=[c for c in self.recording.columns if remove_eye not in c]
            )

        for eyenum, eyename in enumerate(self.eye):
            # Clean-up implausible values for pupil area (pa)
            self.recording.loc[
                self.recording[f"pa_{eyename}"] < 1, f"pa_{eyename}"
            ] = np.nan
            self.recording = self.recording.rename(
                # columns={f"pa_{eyename}": f"eye{eyenum + 1}_pupil_size"},
                columns={f"pa_{eyename}": f"pupil_size"},
            )

        # Interpolate BIDS column names
        columns = list(
            set(self.recording.columns)
            - set(
                (
                    "timestamp",
                    "screen_ppdeg_x_coordinate",
                    "screen_ppdeg_y_coordinate",
                    "pupil_size",
                    # "eye2_pupil_size",
                )
            )
        )
        bids_columns = []
        for eyenum, eyename in enumerate(self.eye):
            for name in columns:
                # colprefix = f"eye{eyenum + 1}" if name.endswith(f"_{eyename}") else ""
                colprefix = ""  # Assume one eye only
                _newname = name.split("_")[0]
                _newname = re.sub(r"([xy])$", r"_\1_coordinate", _newname)
                _newname = re.sub(r"([xy])vel$", r"_\1_velocity", _newname)
                _newname = _newname.split("_", 1)
                _newname[0] = EDF2BIDS_COLUMNS[_newname[0]]
                _newname.insert(0, colprefix)
                bids_columns.append("_".join((_n for _n in _newname if _n)))

        # Rename columns to be BIDS-compliant
        self.recording = self.recording.rename(columns=dict(zip(columns, bids_columns)))

        # Parse calibration metadata
        self.metadata["CalibrationCount"] = 0
        if not calibration.empty:
            calibration.message = calibration.message.str.replace("!CAL", "")
            calibration.message = calibration.message.str.strip()

            self.metadata["CalibrationLog"] = list(
                zip(
                    calibration.time.values.astype(int).tolist(),
                    calibration.message.values,
                )
            )

            calibrations_msg = calibration.message.str.startswith(
                "VALIDATION"
            ) & calibration.message.str.contains("ERROR")
            self.metadata["CalibrationCount"] = int(calibrations_msg.sum())

            if self.metadata["CalibrationCount"] > 1:
                warn("Calibration of more than one eye is not implemented")

            if self.metadata["CalibrationCount"]:
                calibration_last = calibration.index[calibrations_msg][-1]
                try:
                    meta_calib = re.match(
                        r"VALIDATION (?P<ctype>[\w\d]+) (?P<eyeid>[RL]+) (?P<eye>RIGHT|LEFT) "
                        r"(?P<result>\w+) ERROR (?P<avg>-?\d+\.\d+) avg\. (?P<max>-?\d+\.\d+) max\s+"
                        r"OFFSET (?P<offsetdeg>-?\d+\.\d+) deg\. "
                        r"(?P<offsetxpix>-?\d+\.\d+),(?P<offsetypix>-?\d+\.\d+) pix\.",
                        calibration.loc[calibration_last, "message"].strip(),
                    ).groupdict()

                    self.metadata["CalibrationType"] = meta_calib["ctype"]
                    self.metadata["AverageCalibrationError"] = float(meta_calib["avg"])
                    self.metadata["MaximalCalibrationError"] = float(meta_calib["max"])
                    self.metadata["CalibrationResultQuality"] = meta_calib["result"]
                    self.metadata["CalibrationResultOffset"] = [
                        float(meta_calib["offsetdeg"]),
                        (
                            float(meta_calib["offsetxpix"]),
                            float(meta_calib["offsetypix"]),
                        ),
                    ]
                    self.metadata["CalibrationResultOffsetUnits"] = ["deg", "pixels"]
                except AttributeError:
                    warn("Calibration data found but unsuccessfully parsed for results")

        # Process events: first generate empty columns
        self.recording["fixation"] = 0
        self.recording["saccade"] = 0
        self.recording["blink"] = 0

        # Add fixations
        for _, fixation_event in self.events[
            self.events["type"] == "fixation"
        ].iterrows():
            self.recording.loc[
                (self.recording["timestamp"] >= fixation_event["start"])
                & (self.recording["timestamp"] <= fixation_event["end"]),
                "fixation",
            ] = 1

        # Add saccades, and blinks, which are a sub-event of saccades
        for _, saccade_event in self.events[
            self.events["type"] == "saccade"
        ].iterrows():
            self.recording.loc[
                (self.recording["timestamp"] >= saccade_event["start"])
                & (self.recording["timestamp"] <= saccade_event["end"]),
                "saccade",
            ] = 1

            if saccade_event["blink"] == 1:
                self.recording.loc[
                    (self.recording["timestamp"] >= saccade_event["start"])
                    & (self.recording["timestamp"] <= saccade_event["end"]),
                    "blink",
                ] = 1

        # Reorder columns to render nicely (tracking first, pupil size after)
        # Remove the multiple eyes ordering and eye1_ prefix
        ordering = [
            s.replace("eye1_", "")
            for s in BIDS_COLUMNS_ORDER
            if not s.startswith("eye2_")
        ]
        columns = sorted(
            set(self.recording.columns.values).intersection(ordering),
            key=lambda entry: ordering.index(entry),
        )
        columns += [c for c in self.recording.columns.values if c not in columns]
        self.recording = self.recording.reindex(columns=columns)

        # Finalize BIDS metadata
        self.metadata["Columns"] = self.recording.columns.tolist()

        self.metadata["StartTime"] = (
            self.metadata["StartTimestamp"] - self.recording.timestamp.values[0]
        ) / self.metadata["SamplingFrequency"]

        self.metadata["StopTime"] = (
            self.metadata["StopTimestamp"] - self.recording.timestamp.values[0]
        ) / self.metadata["SamplingFrequency"]

        self.metadata.update(
            json.loads((Path(__file__).parent / "bids_defaults.json").read_text())
        )

        self.metadata.update(
            {
                column: desc
                for column, desc in json.loads(
                    (Path(__file__).parent / "eyelink_columns.json").read_text()
                ).items()
                if column in columns
            }
        )

        # Check whether there are repeated timestamps
        if self.recording.timestamp.duplicated().any():
            warn(
                f"Found {self.recording.timestamp.duplicated().sum()} duplicated timestamps."
            )

        # Insert missing timestamps
        start = self.recording.timestamp.values[0]
        end = self.recording.timestamp.values[-1]

        pre_len = len(self.recording)
        new_index = pd.Index(np.arange(start, end + 1, dtype=int), name="timestamp")
        self.recording.set_index("timestamp").reindex(new_index).reset_index()

        if len(self.recording) != pre_len:
            warn(
                f"Inserted {len(self.recording) - pre_len} missing samples "
                "that would be disallowed by BIDS"
            )

    @classmethod
    def from_edf(
        cls: Type[EyeTrackingRun],
        filename: str | Path,
        message_first_trigger: str,
        message_last_trigger: str,
        trial_marker: bytes = b"",
    ) -> EyeTrackingRun:
        """Create a new run from an EDF file."""
        from pyedfread import edf

        recording, events, messages = edf.pread(str(filename), trial_marker=b"")

        return cls(
            recording=recording,
            events=events,
            messages=messages,
            message_first_trigger=message_first_trigger,
            message_last_trigger=message_last_trigger,
        )


def write_bids(
    et_run: EyeTrackingRun,
    exp_run: str | Path,
) -> List[str]:
    """
    Save an eye-tracking run into a existing BIDS structure.

    Parameters
    ----------
    et_run : :obj:`EyeTrackingRun`
        An object representing an eye-tracking run.
    exp_run : :obj:`os.pathlike`
        The path of the corresponding neuroimaging experiment in BIDS.

    Returns
    -------
    List[str]
        A list of generated files.

    """

    exp_run = Path(exp_run)
    out_dir = exp_run.parent
    refname = exp_run.name
    extension = "".join(exp_run.suffixes)
    suffix = refname.replace(extension, "").rsplit("_", 1)[-1]
    refname = refname.replace(f"_{suffix}", "_eyetrack")

    # Remove undesired entities
    refname = re.sub(r"_part-(mag|phase)", "", refname)
    refname = re.sub(r"_echo-[\w\d]+", "", refname)

    # Write out sidecar JSON
    out_json = out_dir / refname.replace(extension, ".json")
    out_json.write_text(
        json.dumps(et_run.metadata, sort_keys=True, indent=2)
    )

    # Write out data
    out_tsvgz = out_dir / refname.replace(extension, ".tsv.gz")
    et_run.recording.to_csv(
        out_tsvgz,
        sep="\t",
        index=False,
        header=False,
        compression="gzip",
        na_rep="n/a",
    )

    return str(out_tsvgz), str(out_json)

In [57]:
TASK_TRIGGER_MSG = {
    "fixation": ("hello", "bye"),
    "qct": ("hello qct", "bye qct"),
    "rest": ("start movie", "Bye rs"),
    "bht": ("hello bht", "Bye bht"),
}
DATA_PATH = Path("D:\\Eye_Dataset\\Sub001\\230928_anatomical_MREYE_study\\ET_EDF")
bids_name = "JB2.BIDS"
bids_file_path = str(DATA_PATH / bids_name)
task = "fixation"
recording_path = DATA_PATH 

In [58]:
# input: 
# et_run EyeTrackingRun \ Object
# exp_run: str \ Path
# Then I have to initialize a EyeTrackingRun Object??????? myself
# According to the link here:
# https://github.com/TheAxonLab/hcph-sops/blob/mkdocs/code/eyetracking/eyetrackingrun.py
recording_df = recording
events_df = events
messages_df = post_messages

et_run = EyeTrackingRun(
    recording=recording_df,
    events=events_df,
    messages=messages_df,
    message_first_trigger="start",
    message_last_trigger="end",
    metadata=metadata,
)



is_present: False


KeyError: 'pa_right'

Issue 1:
- The code snippet explaining how to convert EDF to BIDS with the function write_bids
- As pointed out in the open issue here:
https://github.com/TheAxonLab/hcph-sops/issues/455

Issue 2:
- How to initialize the variables: message_first_trigger and message_last_trigger
https://github.com/TheAxonLab/hcph-sops/issues/456

Issue 3:
- Appear twice
```
if remove_eye := set(("left", "right")) - set(eye):
    remove_eye = remove_eye.pop()  # Drop set decoration
    recording = recording.reindex(
        columns=[c for c in recording.columns if remove_eye not in c]
    )
```
Issue 4:
- variable `screen_resolution` in the code block need to be initialized
