In [32]:
import subprocess
import pandas as pd
import numpy as np
import os
import shutil
import re

In [33]:
def find_besteye(df_msg, default='R'):
    val_msgs = (df_msg[df_msg['text'].str.contains('CAL VALIDATION')][-2:]).to_numpy(dtype=str)
    if not len(val_msgs) or 'ABORTED' in val_msgs[0][1]:
        return default

    left_index = int('LEFT' in val_msgs[1][1])
    right_index = 1 - left_index
    lefterror_index, righterror_index = val_msgs[left_index][1].split().index('ERROR'), val_msgs[right_index][
        1].split().index('ERROR')
    left_error = float(val_msgs[left_index][1].split()[lefterror_index + 1])
    right_error = float(val_msgs[right_index][1].split()[righterror_index + 1])

    return 'L' if left_error < right_error else 'R'


def filter_msgs(df_msg, cutout='validation'):
    first_index = df_msg.index[df_msg['text'].str.contains(cutout)].tolist()[0]

    return df_msg[first_index:]


def is_binocular(df_fix):
    return len(df_fix['eye'].unique()) > 1


def keep_besteye(df_fix, df_msg, default='R'):
    best_eye = default
    if is_binocular(df_fix):
        best_eye = find_besteye(df_msg, default)
        df_fix = df_fix[df_fix['eye'] == best_eye]

    return df_fix, best_eye


def extract_calpoints(df_msg, best_eye, legend='Calibration points', npoints=9):
    calpoints_msg = df_msg[df_msg['text'].str.contains(legend)]
    calpoints = []
    if not calpoints_msg.empty:
        if len(calpoints_msg) >= 2:
            calpoints_msgidx = calpoints_msg.iloc[-2].name if best_eye == 'L' else calpoints_msg.iloc[-1].name
        else:
            calpoints_msgidx = calpoints_msg.iloc[0].name
        calpoints = df_msg.loc[calpoints_msgidx + 1:calpoints_msgidx + npoints]['text'].to_numpy()
        calpoints = [list(map(lambda x: float(x.replace(',', '')), msg.split()[1:3])) for msg in calpoints]
    calpoints = pd.DataFrame(calpoints, columns=['x', 'y'])

    return calpoints


def extract_valpoints(df_msg, best_eye, legend='VALIDATE', npoints=9):
    valpoints_msg = df_msg[df_msg['text'].str.contains(legend)]
    if len(valpoints_msg) > npoints:
        besteye_legend = 'RIGHT' if best_eye == 'R' else 'LEFT'
        valpoints_msg = valpoints_msg[valpoints_msg['text'].str.contains(besteye_legend)]
    valpoints_msg = valpoints_msg['text'].to_numpy()
    points = [msg.split('at')[1].split('OFFSET')[0].split(',') for msg in valpoints_msg]
    regexp = r'(-?\d+\.\d+)\s*,\s*(-?\d+\.\d+)\s*pix'
    offsets = [re.findall(regexp, msg.split('at')[1].split('OFFSET')[1]) for msg in valpoints_msg]
    offsets = [(float(offset[0][0]), float(offset[0][1])) for offset in offsets]
    points = [(int(point[0]), int(point[1])) for point in points]
    valpoints = pd.DataFrame(points, columns=['x', 'y']).astype(int)
    valoffsets = pd.DataFrame(offsets, columns=['x', 'y']).astype(float)

    return valpoints, valoffsets

In [38]:


def convert_edf_to_ascii(edf_file_path, output_dir=None):
    """
    Convert an EDF file to ASCII format using edf2asc.

    Args:
        edf_file_path (str): Path to the input EDF file.
        output_dir (str): Directory to save the ASCII file. If None, the ASCII file will be saved in the same directory as the input EDF file.

    Returns:
        str: Path to the generated ASCII file.
    """
    # Check if edf2asc is installed
    if not shutil.which("edf2asc"):
        raise FileNotFoundError("edf2asc not found. Please make sure EyeLink software is installed and accessible in the system PATH.")

    # Set output directory
    if output_dir is None:
        output_dir = os.path.dirname(edf_file_path)

    # Generate output file path
    edf_file_name = os.path.basename(edf_file_path)
    ascii_file_name = os.path.splitext(edf_file_name)[0] + ".asc"
    ascii_file_path = os.path.join(output_dir, ascii_file_name)

    # Run edf2asc command with the -f flag, only run it if the file does not already exist
    if not os.path.exists(ascii_file_path):
        subprocess.run(["edf2asc", "-f", edf_file_path, ascii_file_path])

    return ascii_file_path



def parse_edf_eyelink(edf_file_path, msg_keywords):
    """
    Parse an EDF file generated by EyeLink system.

    Args:
        edf_file_path (str): Path to the input EDF file.
        msg_keywords (list of str): List of strings representing keywords to filter MSG lines.

    Returns:
        tuple: A tuple containing five pandas DataFrames:
            - Header information DataFrame
            - MSG lines DataFrame filtered by msg_keywords
            - Calibration information DataFrame
            - EyeLink events DataFrame
            - Raw sample data DataFrame
    """
    # Convert EDF to ASCII
    ascii_file_path = convert_edf_to_ascii(edf_file_path)

    # ===== READ IN FILES ===== #
    # Read in EyeLink file
    
    f = open(ascii_file_path,'r')
    fileTxt0 = f.read().splitlines(True) # split into lines
    fileTxt0 = list(filter(None, fileTxt0)) #  remove emptys
    fileTxt0 = np.array(fileTxt0) # concert to np array for simpler indexing
    f.close()


    # Separate lines into samples and messages
    print('Sorting lines...')
    nLines = len(fileTxt0)
    lineType = np.array(['OTHER']*nLines,dtype='object')


    # Usar lo de mne, particularmente para calibration.
    # En sample tendría que filtrar lo que viene después de START y antes de END.
    

    calibration_flag = False
    start_flag = False
    for iLine in range(nLines):
        if len(fileTxt0[iLine])<3:
            lineType[iLine] = 'EMPTY'
        elif fileTxt0[iLine].startswith('*'):
            lineType[iLine] = 'HEADER'
        # If there is a !CAL in the line, it is a calibration line
        elif '!CAL' in fileTxt0[iLine]:
            lineType[iLine] = 'Calibration'
            calibration_flag = True
        elif fileTxt0[iLine].split()[0] == 'START' and calibration_flag:
            calibration_flag = False
            start_flag = True
        elif calibration_flag:
            lineType[iLine] = 'Calibration'
        elif not start_flag:
            lineType[iLine] = 'EMPTY'
        elif fileTxt0[iLine].split()[0] == 'MSG' and any(keyword in fileTxt0[iLine] for keyword in msg_keywords):
            lineType[iLine] = 'MSG'
        elif fileTxt0[iLine].split()[0][0].isdigit() or fileTxt0[iLine].split()[0].startswith('-'):
            lineType[iLine] = 'SAMPLE'
        else:
            lineType[iLine] = fileTxt0[iLine].split()[0]
        
    # Print the amount of each type of line
    print('Amount of each line type:')
    print(pd.Series(lineType).value_counts())
    
    
    # ===== PARSE EYELINK FILE ===== #
    # Import Header
    print('Parsing header...')
    dfHeader = pd.read_csv(ascii_file_path,skiprows=np.nonzero(lineType!='HEADER')[0],header=None,sep='\s+')
    # Merge columns into single strings
    dfHeader = dfHeader.apply(lambda x: ' '.join(x.dropna().astype(str)), axis=1)


    # Import Calibration
    print('Parsing calibration...')
    iCal = np.nonzero(lineType=='Calibration')[0]
    dfCalib = pd.read_csv(ascii_file_path,skiprows=iCal,header=None)



    # Import Message
    print('Parsing messages...')
    i_msg = np.nonzero(lineType == 'MSG')[0]
    t_msg = []
    txt_msg = []
    for i in range(len(i_msg)):
        # separate MSG prefix and timestamp from rest of message
        info = fileTxt0[i_msg[i]].split()
        # extract info
        t_msg.append(int(info[1]))
        txt_msg.append(' '.join(info[2:]))
    dfMsg = pd.DataFrame({'time': t_msg, 'text': txt_msg})

    # Import Fixations
    print('Parsing fixations...')
    i_not_efix = np.nonzero(lineType != 'EFIX')[0]
    df_fix = pd.read_csv(ascii_file_path, skiprows=i_not_efix, header=None, delim_whitespace=True, usecols=range(1, 8),
                         low_memory=False)
    df_fix.columns = ['eye', 'tStart', 'tEnd', 'duration', 'xAvg', 'yAvg', 'pupilAvg']

    # Saccades
    print('Parsing saccades...')
    i_not_esacc = np.nonzero(lineType != 'ESACC')[0]
    df_sacc = pd.read_csv(ascii_file_path, skiprows=i_not_esacc, header=None, delim_whitespace=True, usecols=range(1, 11),
                          low_memory=False)
    df_sacc.columns = ['eye', 'tStart', 'tEnd', 'duration', 'xStart', 'yStart', 'xEnd', 'yEnd', 'ampDeg', 'vPeak']

    # Blinks
    print('Parsing blinks...')
    df_blink = pd.DataFrame()
    i_not_eblink = np.nonzero(lineType != 'EBLINK')[0]
    if len(i_not_eblink) < nLines:
        df_blink = pd.read_csv(ascii_file_path, skiprows=i_not_eblink, header=None, delim_whitespace=True, usecols=range(1, 5),
                               low_memory=False)
        df_blink.columns = ['eye', 'tStart', 'tEnd', 'duration']

    # determine sample columns based on eyes recorded in file
    eyes_in_file = np.unique(df_fix.eye)
    if eyes_in_file.size == 2:
        cols = ['tSample', 'LX', 'LY', 'LPupil', 'RX', 'RY', 'RPupil']
    else:
        eye = eyes_in_file[0]
        print('monocular data detected (%c eye).' % eye)
        cols = ['tSample', '%cX' % eye, '%cY' % eye, '%cPupil' % eye]

    # Import samples
    i_not_sample = np.nonzero(lineType != 'SAMPLE')[0]
    dfSamples = pd.read_csv(ascii_file_path, skiprows=i_not_sample, header=None, delim_whitespace=True,
                                usecols=range(0, len(cols)), low_memory=False)
    dfSamples.columns = cols
    # Convert values to numbers
    for eye in ['L', 'R']:
        if eye in eyes_in_file:
            dfSamples['%cX' % eye] = pd.to_numeric(dfSamples['%cX' % eye], errors='coerce')
            dfSamples['%cY' % eye] = pd.to_numeric(dfSamples['%cY' % eye], errors='coerce')
            dfSamples['%cPupil' % eye] = pd.to_numeric(dfSamples['%cPupil' % eye], errors='coerce')
        else:
            dfSamples['%cX' % eye] = np.nan
            dfSamples['%cY' % eye] = np.nan
            dfSamples['%cPupil' % eye] = np.nan

    dict_events = {'fix': df_fix, 'sacc': df_sacc, 'blink': df_blink}
    return dfHeader, dfMsg, dfCalib, dfSamples, dict_events



In [39]:
# Example usage:
edf_file_path = "/home/gonzalo/Escritorio/edf_nuevo_dataset/ab01_second_half_2023-10-09_10h50.06.034.asc"
msg_keywords = []
header_df, msg_df, calib_df, raw_sample_df, dict_events = parse_edf_eyelink(edf_file_path, msg_keywords)

# Print the first few rows of each DataFrame
print("Header DataFrame:")
print(header_df.head())
print("\nMSG DataFrame:")
print(msg_df.head())
print("\nCalibration DataFrame:")
print(calib_df.head())
print("\nEyeLink Events DataFrames:")
for event_type, df in dict_events.items():
    print(f"\n{event_type.capitalize()} DataFrame:")
    print(df.head())
print("\nRaw Sample Data DataFrame:")
print(raw_sample_df.head())

Sorting lines...
Amount of each line type:
SAMPLE         828742
SFIX             4323
EFIX             4317
ESACC            4181
SSACC            4181
MSG              1388
EBLINK            587
SBLINK            585
Calibration       549
END                95
PRESCALER          95
VPRESCALER         95
PUPIL              95
EVENTS             95
SAMPLES            95
START              90
EMPTY              61
HEADER             10
OTHER               5
Name: count, dtype: int64
Parsing header...
Parsing calibration...
Parsing messages...
Parsing fixations...


  df_fix = pd.read_csv(ascii_file_path, skiprows=i_not_efix, header=None, delim_whitespace=True, usecols=range(1, 8),


Parsing saccades...


  df_sacc = pd.read_csv(ascii_file_path, skiprows=i_not_esacc, header=None, delim_whitespace=True, usecols=range(1, 11),


Parsing blinks...


  df_blink = pd.read_csv(ascii_file_path, skiprows=i_not_eblink, header=None, delim_whitespace=True, usecols=range(1, 5),
  dfSamples = pd.read_csv(ascii_file_path, skiprows=i_not_sample, header=None, delim_whitespace=True,


Header DataFrame:
0    ** CONVERTED FROM ab01_second_half_2023-10-09_...
1                    ** DATE: Sun Jun 29 00:37:35 2003
2         ** TYPE: EDF_FILE BINARY EVENT SAMPLE TAGGED
3                             ** VERSION: EYELINK II 1
4                                ** SOURCE: EYELINK CL
dtype: object

MSG DataFrame:
      time                                   text
0  2356867            !MODE RECORD CR 1000 2 0 LR
1  2400101                    beginning_of_target
2  2400107                  RECCFG CR 1000 2 0 LR
3  2400107                        ELCLCFG BTABLER
4  2400107  GAZE_COORDS 0.00 0.00 1920.00 1080.00

Calibration DataFrame:
                                                   0
0  ** CONVERTED FROM ab01_second_half_2023-10-09_...
1                  ** DATE: Sun Jun 29 00:37:35 2003
2       ** TYPE: EDF_FILE BINARY EVENT SAMPLE TAGGED
3                           ** VERSION: EYELINK II 1
4                              ** SOURCE: EYELINK CL

EyeLink Events DataFrame:

Fix Dat