In [None]:
import os
import pandas as pd
import numpy as np
import pickle

In [None]:
# Set these manually:
subject_id = "aaaa"
run_number = 1

In [None]:
# Requires the files to be formatted according to the f-strings below:
filepath = f"data/{subject_id}/{subject_id}S{run_number}.asc"
log_filepath = f"data/{subject_id}/log{subject_id}S{run_number}.txt"
output_filepath = f"./data/processed_data/{subject_id}S{run_number}.csv"

In [None]:
with open(filepath, "r", encoding="utf-8") as f:
    raw_data = f.readlines()

with open(log_filepath, "r", encoding="utf-8") as f:
    log_data = f.readlines()

In [None]:
# Separate out the sessions, starting at adaptation and ending when the recording ends for that session.
# Sessions get stored in a dict called "session_data", with keys ranging from session_1 to session_4.
keep_line = False
last_line = False
session_idx = 0
session_data = {
    "session_1": [],
    "session_2": [],
    "session_3": [],
    "session_4": [],
}

for line in raw_data:
    reformatted_line = line
    if "ADAPTATION_START" in reformatted_line:
        keep_line = True
        session_idx += 1
    elif "END" in reformatted_line[:3]:
        keep_line = False
        last_line = True

    if keep_line or last_line:
        session_data[f"session_{session_idx}"].append(reformatted_line)
        last_line = False

In [None]:
# Create a dictionary that will hold all the data which is to be categorised into either the
# session's adaptation period or to be placed in a list for each block within that session:
extracted_data = {}

# Create an individual dictionary for each session that has separate data per event/trial:
for idx in range(4):
    extracted_data[f"session_{idx+1}"] = {
        "adaptation_period": [],
        "adaptation_period_overspill": [],
        "raw_blocks": {}
    }


adaptation_end_found = False
current_block = 0

for session in extracted_data.keys():
    adaptation_end_found = False
    current_block = 0

    for idx, line in enumerate(session_data[session]):
        # Add the adaptation period data to its own key:
        if not adaptation_end_found:
            extracted_data[session]["adaptation_period"].append(line)
        if "ADAPTATION_END" in line:
            adaptation_end_found = True

        # If the adaptation period is over, we can start adding blocks:
        if adaptation_end_found:
            # Get blocks:
            if "BLOCK_START" in line:
                current_block = int((line.split("_")[2]))

            # Create the current block's key if it doesn't already exist:
            if current_block not in extracted_data[session]["raw_blocks"]:
                # Handle if the current block is still 0, meaning it's the 'overspill' of the adaptation period:
                if current_block == 0:
                    pass
                else:
                    # This makes sure the blocks are 1-indexed, so if we want to access the first block, we do so with extracted_data[session][1]
                    extracted_data[session]["raw_blocks"][current_block] = []

            # Add the line to the current block's list:
            if current_block == 0:
                extracted_data[session]["adaptation_period_overspill"].append(line)
            else:
                extracted_data[session]["raw_blocks"][current_block].append(line)

# Explanation of data structure:
We now have a dictionary called `extracted_data`.

Within `extracted_data` are four more dictionaries called `session_1`, `session_2`, `session_3`, and `session_4`.

Each of these 'session' dictionaries contain 3 keys: `adaptation_period`, `adaptation_period_overspill`, and `raw_blocks`.

## Adaptation Period:
This is a list containing each line from the ASCII EDF file, starting from the first line containing "ADAPTATION_START", and ending with the first following instance of "ADAPTATION_END" for that session, as received by the eye-tracker in its EDF file.

## Adaptation Period Overspill:
This is also a list of lines like in the 'Adaptation Period' list, however these are any lines that are between the same "ADAPTATION_END" and the first instance of "BLOCK_START_ for that session, effectively meaning it's any data logged to the EDF but wasn't officially captured during the adaptation period for whatever reason. It's usually around 11ms worth of data.

## Raw Blocks:
This is a dictionary with keys starting from 1 and continuing up to the total number of blocks for that session. For each block, there is a list containing every line recorded during that block. The lines begin with the first instance of "BLOCK_START" for that block, and end with the very last line before the next instance of "BLOCK_START", therefore including every line of the EDF file for that particular block.

In [None]:
for session in extracted_data.keys():

    # Creating the trial dictionaries:
    extracted_data[session]["blocks"] = {}

    for block_idx in extracted_data[session]["raw_blocks"].keys():

        extracted_data[session]["blocks"][block_idx] = {"trials": {}}

        new_trial = False
        curr_trial = 0

        for line in extracted_data[session]["raw_blocks"][block_idx]:

            # Determine whether to record the line in the dict or not:
            if "TRIALID_VAR" in line:
                curr_trial = int(line.split("TRIALID ")[1].split(",")[0].split("_")[2])
                new_trial = True
            else:
                new_trial = False

            if new_trial:
                extracted_data[session]["blocks"][block_idx]["trials"][curr_trial] = []

            if curr_trial in extracted_data[session]["blocks"][block_idx]["trials"]:
                extracted_data[session]["blocks"][block_idx]["trials"][curr_trial].append(line)

# New Data Structure
Within the dictionary called `extracted_data`, there is now a new dictionary under the key `blocks`.

`blocks` is like `raw_blocks`, having the same keys indexed starting at 1, and incrementing up until all blocks for a particular session have been included.

Within a given block index under `blocks` is a new dictionary called `trials`, which itself is indexed starting at 1 and going up until all trials for that block are included. Within these indexed trials is a list containing all lines associated with that trial.

Thus, to access all the lines associated with the second trial in block 15 for session 2, you would call `extracted_data["session_2"]["blocks"][15]["trials"][2]`.

# Making a Pandas-Compatible Dictionary:
We now have a human-friendly dictionary in which we can access the raw line data by specifying a session, block, and trial number. However, because this line data is raw in format, and its formatting changes depending on the type of message sent to the EDF during recording, we need to process it in a way such that every line has the same formatting and can then be filtered based on what we're looking for.

To achieve the filtering, I'll use Pandas, which requires every line to have the same formatting anyway. Thus, I will iterate over the `extracted_data` dict and store the reformatted data lines in a list called `data_for_pd` which will be used to create the Pandas DataFrame:

In [None]:
# Define a function that can take in a line and convert it to the dictionary format we need for the Pandas DataFrame:
def line_to_dict(
        entry, session_num, block_num, trial_num, session_con, session_common_ob,
        trial_con=None, main_stim_visibility=None, main_stim_type=None, target_status=None, att_letter=None
):
    formatted_line = {
        "Subject": subject_id,
        "Run": run_number,                                  # Defines whether it was the 1st or 2nd run of the experiment (int - 1 or 2)
        "Timestamp": None,
        "Message Type": None,
        "Session": session_num,                             # The session number
        "Session Condition": session_con,                   # Either "Attend" or "Divert"
        "Session Common Oddball": session_common_ob,        # Either "fine gabor" or "noise disk"
        "Block": block_num,                                 # The block number
        "Trial": trial_num,                                 # The trial number
        "Trial Condition": trial_con,                       # Either "Standard" or "Oddball"
        "Main Stimulus Visibility": main_stim_visibility,   # A bool - True iff the main stimulus was visible at that timestamp
        "Main Stimulus Type": main_stim_type,               # A string describing which of the three possible main stimuli was shown
        "Target Status": target_status,                     # A bool - True iff the target attention stimulus is showing.
        "Attention Letter": att_letter,                     # The letter used (can be passed as None during attend conditions
        "Gaze X": None,
        "Gaze Y": None,
        "Pupil": None,
    }

    split_line = entry.split("\t")

    # Check the first character of the message to see if it's a number. If it is, it's a line of data. Otherwise, it's a message we sent to the eyetracker.
    if not entry[0].isnumeric():
        formatted_line["Message Type"] = "Message"

        # Fill out the dictionary depending on the different types of messages it may receive:
        if "TRIALID_VAR" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Trial Start"

        elif "TRIALID" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Trial Start 2"

        elif "MAIN_STIM_ONSET" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Main Stimulus Onset"

        elif "MAIN_STIM_OFFSET" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Main Stimulus Offset"

        elif "FIXATION_STIM_ONSET" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Fixation Stimulus Onset"

        elif "INTERIM_START" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Interim Onset"

        elif "INTERIM_END" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Interim Offset"

        elif "KEY_RESPONSE" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0])
            formatted_line["Message Type"] = "Key Response"

        elif "EFIX" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1])
            formatted_line["Message Type"] = "Fixation Interrupted"

        elif "SFIX" in entry:
            formatted_line["Timestamp"] = int(entry.split(" ")[-1].strip())
            formatted_line["Message Type"] = "Fixation Resumed"

        elif "SSACC" in entry:
            formatted_line["Timestamp"] = int(entry.split(" ")[-1].strip())
            formatted_line["Message Type"] = "Saccade Start"

        elif "ESACC" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1])
            formatted_line["Message Type"] = "Saccade End"

        elif "SBLINK" in entry:
            formatted_line["Timestamp"] = int(entry.split(" ")[2].strip()) - 1 # These messages are always ahead by 1
            formatted_line["Message Type"] = "Blink Start"

        elif "EBLINK" in entry:
            formatted_line["Timestamp"] = int(entry.split("\t")[1].split(" ")[0]) + 1 # These messages are always behind by 1
            formatted_line["Message Type"] = "Blink End"

        elif "END" in entry:
            formatted_line["Timestamp"] = int(entry.split(" ")[0].replace("END", "").strip())
            formatted_line["Message Type"] = "Session End"

        else:
            print(f"Unhandled message type:\n{entry}\n")

    # Fill out the dictionary assuming that the entry is a typical line of data:
    else:
        formatted_line["Message Type"] = "Data"
        formatted_line["Timestamp"] = int(split_line[0])
        if float(split_line[3].strip()) > 0: # Handles blinks
            formatted_line["Gaze X"] = split_line[1].strip()
            formatted_line["Gaze Y"] = split_line[2].strip()
            formatted_line["Pupil"] = split_line[3].strip()

    return formatted_line



## Additional line processing:
The code below handles the basic extraction of data from each line, such as the timestamp of the message, and the basic formatting. However, because each line of data doesn't necessarily contain information that we want to persist through each line (for instance, a typical line of data doesn't tell us whether the main stimulus condition is a standard or oddball for that trial), we need to process this outside of the function as we iterate over each line, then pass that information into the function along with the line.

In [None]:
data_for_pd = []
target_is_on = False # Necessary for the trail processing in the 2nd loop of trial data (see below).
curr_letter = None

# This debug list is used to display the order in which the sessions, blocks, and trials were processed, useful for debugging.
# The list is used instead of printing because Jupyter's buffer is overloaded by the rapid processing steps, causing printed debug messages to become jumbled.
debug_log = []

# Get the order of the behavioural conditions ('attend' or 'divert' conditions) from the log:
for log_line in log_data:
    if "Session Condition Order:" in log_line:
        con_order = log_line.split("[")[-1].replace("]", "").replace("'", "").strip().split(", ")
        break

# Get the order of the common oddballs from the log:
for log_line in log_data:
    if "Common Oddball Order:" in log_line:
        oddball_order = log_line.split("[")[-1].replace("]", "").replace("'", "").strip().split(", ")
        break


# Iterate over each session, and store the session number and the condition in a variable to pass on to the function:
for session_name, session_condition, session_common_oddball in zip(extracted_data.keys(), con_order, oddball_order):
    session = int(session_name[-1])

    # DEBUG:
    debug_log.append(f"Session: {session}")

    for block in extracted_data[session_name]["blocks"]: # Iterate over the blocks, storing the block number to pass on.

        # DEBUG:
        debug_log.append(f"Session: {session}\tBlock: {block}")

        # This is the first loop through the trial data - it finds the information it needs to pass into the 2nd loop through the data:
        for trial, trial_data in extracted_data[session_name]["blocks"][block]["trials"].items():

            # DEBUG:
            debug_log.append(f"Session: {session}\tBlock: {block}\tTrial: {trial}")

            # Create a dict to store the relevant info, including onset and offset indexes for the relevant lines, and info about the trial:
            trial_info = {
                "main_stim_onset_idx": None,
                "main_stim_offset_idx": None,
                "main_stim_type": None,
                "fixation_stim_onset_idxs": [],
                "fixation_stim_offset_idxs": [],
                "fixation_types": [],
                "fixation_letters": [],
                "trial_condition": None,
            }

            for data_line_idx, data_entry in enumerate(trial_data):
                if "MAIN_STIM_ONSET" in data_entry:
                    trial_info["main_stim_onset_idx"] = data_line_idx
                    trial_info["main_stim_type"] = data_entry.split("Stim: ")[-1].strip()

                elif "MAIN_STIM_OFFSET" in data_entry:
                    trial_info["main_stim_offset_idx"] = data_line_idx

                elif "FIXATION_STIM_ONSET" in data_entry:
                    if len(trial_info["fixation_stim_onset_idxs"]) > 0: # If not the first fixation stim presentation, add an offset time for the prior one:
                        trial_info["fixation_stim_offset_idxs"].append(data_line_idx - 1)
                    trial_info["fixation_stim_onset_idxs"].append(data_line_idx)
                    attention_type, attention_letter = data_entry.split("Attention_type: ")[-1].split(", Letter: ")
                    attention_letter = attention_letter.replace(",FrameN: 0", "").strip()
                    trial_info["fixation_types"].append(attention_type)
                    trial_info["fixation_letters"].append(attention_letter)

                elif "INTERIM_END" in data_entry:
                    trial_info["fixation_stim_offset_idxs"].append(data_line_idx - 1)

                elif "TRIALID_VAR" in data_entry:
                    trial_condition_letter = data_entry.split("Type: ")[-1].strip()
                    if trial_condition_letter == "s":
                        trial_info["trial_condition"] = "Standard"
                    elif trial_condition_letter == "o":
                        trial_info["trial_condition"] = "Oddball"
                    else:
                        trial_info["trial_condition"] = f"Error: Got {trial_condition_letter}"



            # Loop through the same trial again, this time with the trial info contained in the trial_info dict:
            for data_line_idx, data_entry in enumerate(trial_data):

                # Check if the main stimulus was visible when this data line was written:
                if trial_info["main_stim_onset_idx"] <= data_line_idx < trial_info["main_stim_offset_idx"]:
                    main_stim_is_visible = True
                else:
                    main_stim_is_visible = False


                # Find the fixation index period where the current data entry will fall,
                # and check if the target was showing, and what letter was showing (if any):
                if data_line_idx < trial_info["fixation_stim_onset_idxs"][0]:
                    # This handles the first few lines of each trial where there is noting pertaining to the actual on-screen stimuli.
                    # In this event, the previous info pertatining to the attention stimulus should be correct, except in the very first trial
                    # of an experiment. In that case, there will be no stimulus info. target_is_on is therefore defined as false, and letter as None
                    # when they're first initialised.
                    pass # This code was included for readability, but could be removed later if needed by swapping the operator in the if statement
                else:
                    for fix_onset, fix_offset, fix_letter, fix_status  in zip(
                            trial_info["fixation_stim_onset_idxs"],
                            trial_info["fixation_stim_offset_idxs"],
                            trial_info["fixation_letters"],
                            trial_info["fixation_types"]
                    ):
                        if fix_onset < data_line_idx <= fix_offset:
                            # Define the letter during this period:
                            if fix_letter == "None":
                                curr_letter = None
                            else:
                                curr_letter = fix_letter

                            # Define the status of the attention stimulus (was it a target or standard attention stimulus?):
                            if fix_status == "Normal":
                                target_is_on = False
                            elif fix_status == "Target":
                                target_is_on = True
                            else:
                                print(f"ERROR: Unexpected target value: {fix_status}")
                            break


                line_as_dict = line_to_dict(
                    entry=data_entry,
                    session_num=session,
                    block_num=block,
                    trial_num=trial,
                    session_con=session_condition,
                    session_common_ob=session_common_oddball,
                    trial_con=trial_info["trial_condition"],
                    main_stim_visibility=main_stim_is_visible,
                    main_stim_type=trial_info["main_stim_type"],
                    target_status=target_is_on,
                    att_letter=curr_letter
                )

                data_for_pd.append(line_as_dict)

In [None]:
df = pd.DataFrame(data_for_pd)

In [None]:
df.to_csv(output_filepath)

In [None]:
for line in debug_log:
    print(line)

# Calculate baseline periods for the first trial of each session
These are needed so that the first trial of each session can have a baseline from which to normalise the pupil data. All other trials can use the respective prior trial's last 200ms to calculate these from, but we need to use the adaptation period for the first one.

In [None]:
baselines = {}
for session in [1, 2, 3, 4]:
    print(f"Calculating Baselines for Session: {session}")

    baseline_period = extracted_data[f"session_{session}"]["adaptation_period"][-501:-1]

    baseline_measurements = []
    for entry in baseline_period:
        if not entry[0].isnumeric():
            pass
        else:
            baseline_measurements.append(float(entry.split("\t")[3].strip()))
    ses_baseline = np.array(baseline_measurements).mean()

    baselines[f"Session {session}"] = ses_baseline
    print(f"Baseline taken from {len(baseline_measurements)} entries")
    print(f"Baseline: {ses_baseline}\n")

os.makedirs(f"data/processed_data/baselines/{subject_id}", exist_ok=True)

baseline_filename = f"data/processed_data/baselines/{subject_id}/run_{run_number}_baselines.pkl"
with open(baseline_filename, 'wb') as f: # 'wb' for write binary
    pickle.dump(baselines, f)

# Create a dictionary of final blinks and saccade pass on to the analysis workbook
This is necessary because we're not passing along the adaptation phase information in the .csv as it's not of interest to our analysis.
However, a blink or saccade could have started during this window of time, then end during the first trial.
If it ends during the first trial and we don't have the start message, then we will have start and end times that are incorrectly paired together.
Instead, we'll process this data here instead and pass it on.

In [None]:
blinks_and_saccades = {
    "session_1": {"blinks": None, "saccades": None},
    "session_2": {"blinks": None, "saccades": None},
    "session_3": {"blinks": None, "saccades": None},
    "session_4": {"blinks": None, "saccades": None},
}
for session in [1, 2, 3, 4]:
    blink_data = []
    saccade_data = []
    for entry in extracted_data[f"session_{session}"]["adaptation_period"]:
        if not entry[0].isnumeric():
            if "SSACC" in entry:
                saccade_data.append(("Start", int(entry.split(' ')[-1].strip())))

            elif "ESACC" in entry:
                saccade_data.append(("End", int(entry.split('\t')[1])))

            elif "SBLINK" in entry:
                blink_data.append(("Start", int(entry.split(" ")[2].strip()) - 1))

            elif "EBLINK" in entry:
                blink_data.append(("End", int(entry.split("\t")[1].split(" ")[0]) + 1))

    for entry in extracted_data[f"session_{session}"]["adaptation_period_overspill"]:
        if not entry[0].isnumeric():
            if "SSACC" in entry:
                saccade_data.append(("Start", int(entry.split(' ')[-1].strip())))

            elif "ESACC" in entry:
                saccade_data.append(("End", int(entry.split('\t')[1])))

            elif "SBLINK" in entry:
                blink_data.append(("Start", int(entry.split(" ")[2].strip()) - 1))

            elif "EBLINK" in entry:
                blink_data.append(("End", int(entry.split("\t")[1].split(" ")[0]) + 1))

    if len(saccade_data) > 0: # Check if the list is populated so we don't get indexing errors:
        if saccade_data[-1][0] == "Start":
            blinks_and_saccades[f"session_{session}"]["saccades"] = saccade_data[-1][-1] # Add the data if the final saccade started but didn't end
    if len(blink_data)> 0:
        if blink_data[-1][0] == "Start":
            blinks_and_saccades[f"session_{session}"]["blinks"] = blink_data[-1][-1]

print(blinks_and_saccades)

os.makedirs(f"data/processed_data/final_blinks_and_saccades/{subject_id}", exist_ok=True)

blinks_and_saccades_filename = f"data/processed_data/final_blinks_and_saccades/{subject_id}/run_{run_number}.pkl"
with open(blinks_and_saccades_filename, 'wb') as f: # 'wb' for write binary
    pickle.dump(blinks_and_saccades, f)