# MultiplEYE preprocessing

In [None]:
from pathlib import Path

data_dir = Path(
    "../data/MultiplEYE_SQ_CH_Zurich_1_2025/eye-tracking-sessions/006_SQ_CH_1_ET1/"
)
stim_dir = Path("../data/stimuli_MultiplEYE_SQ_CH_Zurich_1_2025")
output_dir = Path("output")
output_dir.mkdir(exist_ok=True)


## EDF to ASC

Use the `edf2asc` binary from the [EyeLink Developers Kit](https://www.sr-research.com/support/thread-13.html) to convert EDF files to ASC files.

Notes:
- We probably can't distribute the binary due to licensing issues. (But we might be able to distribute a Docker image?)
- There is already an [issue](https://github.com/aeye-lab/pymovements/issues/509) to integrate this into `pymovements`.
- The `-input` option is unnecessary, but currently required by `parse_eyelink()` in `pymovements`.

In [None]:
import subprocess


def convert_edf_to_asc(edf_path: Path, output_dir: Path) -> Path:
    output_dir.mkdir(exist_ok=True)
    asc_file = output_dir / f"{edf_path.stem}.asc"

    if asc_file.exists():
        print(f"[INFO] ASC file already exists at {asc_file}, skipping conversion.")
        return asc_file

    cmd = [
        "edf2asc",
        "-input",
        "-ftime",
        "-p",
        str(output_dir),
        "-y",
        str(edf_path),
    ]

    result = subprocess.run(cmd, capture_output=True, text=True)

    print(result.stdout)  # always show stdout, because edf2asc prints progress here
    if result.stderr:
        print("[stderr]", result.stderr)

    if asc_file.exists():
        print(f"[INFO] Conversion completed, ASC file created at {asc_file}")
        return asc_file
    else:
        # Only raise error if no output file exists
        raise RuntimeError(
            f"edf2asc failed to produce an ASC file. Exit code: {result.returncode}"
        )


asc_file = convert_edf_to_asc(data_dir / "006sqch1.edf", Path("output"))

## ASC to sample-level CSV

Convert the ASC files to CSV files (one for each page) where each row is a sample.

In [None]:
patterns = [
    r"start_recording_(?P<trial>(?:PRACTICE_)?trial_\d+)_(?P<screen>.+)",
    r"start_recording_(?P<trial>(?:PRACTICE_)?trial_\d+)_(?P<screen>familiarity_rating_screen_\d+|subject_difficulty_screen)",
    {"pattern": r"stop_recording_", "column": "trial", "value": None},
    {"pattern": r"stop_recording_", "column": "screen", "value": None},
    {
        "pattern": r"start_recording_(?:PRACTICE_)?trial_\d+_page_\d+",
        "column": "activity",
        "value": "reading",
    },
    {
        "pattern": r"start_recording_(?:PRACTICE_)?trial_\d+_question_\d+",
        "column": "activity",
        "value": "question",
    },
    {
        "pattern": r"start_recording_(?:PRACTICE_)?trial_\d+_(familiarity_rating_screen_\d+|subject_difficulty_screen)",
        "column": "activity",
        "value": "rating",
    },
    {"pattern": r"stop_recording_", "column": "activity", "value": None},
    {
        "pattern": r"start_recording_PRACTICE_trial_",
        "column": "practice",
        "value": True,
    },
    {
        "pattern": r"start_recording_trial_",
        "column": "practice",
        "value": False,
    },
    {"pattern": r"stop_recording_", "column": "practice", "value": None},
]

### Parse ASC file

In [None]:
import csv

import polars as pl
import pymovements as pm

# asc = output_dir / "ch1hr007.asc"
asc = output_dir / "006sqch1.asc"

In [None]:
# Pymovements from_asc():
# patterns: str | list[dict[str, Any] | str] | None
#   List of patterns to match for additional columns or a key identifier of eye tracker specific
#   default patterns. Supported values are: `'eyelink'`. If `None` is passed, `'eyelink'` is
#   assumed. (default: None)

# Question: how do I know what patterns to use for my data?

In [None]:
# using patterns predifined above (from Andreas)
gazePatterns = pm.gaze.from_asc(
    asc,
    patterns=patterns,
)
gazePatterns.frame

In [None]:
# default frame without passing patterns
# gazeNoPatterns = pm.gaze.from_asc(
#     asc,
#     patterns=patterns,
# )
# gazeNoPatterns.frame

In [None]:
gazePatterns._metadata

In [None]:
gazePatterns.events

### Map trial numbers to stimulus IDs

In [None]:
stimulus_ids = {
    "PRACTICE_trial_1": "practice1",
    "PRACTICE_trial_2": "practice2",
}
with open(data_dir / "logfiles/completed_stimuli.csv") as f:
    # with open(data_dir / "logfiles" / "completed_stimuli.csv") as f:
    reader = csv.DictReader(f)
    for i, row in enumerate(reader):
        stimulus_ids[f"trial_{i + 1}"] = f"stimulus{row['stimulus_id']}"

df = gazePatterns.frame.with_columns(
    pl.col("trial").replace(stimulus_ids).alias("stimulus_id")
)
df

### Write separate CSVs for each page

Unnest [x, y] pixel column into separate pixel_x and piyel_y columns. This is necessary because polars does not support nested values when exporting CSV.

In [None]:
df = df.select(
    [
        pl.all().exclude("pixel"),
        pl.col("pixel").list.get(0).alias("pixel_x"),
        pl.col("pixel").list.get(1).alias("pixel_y"),
    ]
)
df

Split data into CSV files.

In [None]:
raw_dir = output_dir / "raw"
raw_dir.mkdir(exist_ok=True, parents=True)

for stimulus_id in df["stimulus_id"].unique():
    if stimulus_id is not None:
        stimulus_df = df.filter((pl.col("stimulus_id") == stimulus_id))
        stimulus_df = stimulus_df.select(
            [
                pl.col("time"),
                pl.col("screen"),
                pl.col("pixel_x"),
                pl.col("pixel_y"),
                pl.col("pupil"),
            ]
        )
        stimulus_df.write_csv(raw_dir / f"S007_{stimulus_id}.csv")

## ⬇️ Everything from this point on would be part of the published preprocessing pipeline ⬇️

## Dataset definition

In [None]:
from dataclasses import dataclass, field
import pymovements as pm


@dataclass
class Multipleye(pm.DatasetDefinition):
    name: str = "Multipleye"

    filename_format: str = r"S{subject_id:d}_{stimulus_id}.csv"

    filename_format_dtypes = {
        "subject_id": int,
        "stimulus_id": str,
        "screen": str,
    }

    trial_columns: list[str] = field(
        default_factory=lambda: ["subject_id", "stimulus_id", "screen"]
    )

    time_column: str = "time"

    time_unit: str = "ms"

    pixel_columns: list[str] = field(default_factory=lambda: ["pixel_x", "pixel_y"])


# # TODO: Read this from a metadata file
# experiment = pm.Experiment(
#     sampling_rate=2000,
#     screen_width_px=1275,
#     screen_height_px=916,
#     screen_width_cm=37,
#     screen_height_cm=28,
#     distance_cm=60,
# )

experiment = pm.Experiment(
    sampling_rate=gazePatterns._metadata["sampling_rate"],
    screen_width_px=1275,
    screen_height_px=916,
    screen_width_cm=37,
    screen_height_cm=28,
    distance_cm=60,
)

dataset = pm.Dataset(Multipleye(experiment=experiment), "output")

In [None]:
dataset.load()

In [None]:
# dataset.gaze = dataset.gaze[:2]  # To avoid OOM
dataset

## Fixation and saccade detection

Notes:
- `compute_event_properties()` uses a lot of memory (https://github.com/aeye-lab/pymovements/issues/753). It's currently not possible to run it on the entire dataset on any ol' laptop.

In [None]:
# Savitzky-Golay filter as in https://doi.org/10.3758/BRM.42.1.188
window_length = round(experiment.sampling_rate / 1000 * 50)  # 50 ms
if window_length % 2 == 0:  # Must be odd
    window_length += 1
dataset.pix2deg().pos2vel(
    method="savitzky_golay", window_length=window_length, degree=2
)
dataset.detect("ivt")
dataset.compute_event_properties(("location", dict(position_column="pixel")))
# dataset.detect("fill", name="saccade")
# dataset.detect("microsaccades")
dataset.events

## Plots

### Gaze plot

In [None]:
import math

import matplotlib.pyplot as plt
import PIL.Image
import polars as pl
from matplotlib.patches import Circle

trial = 0
screen = "page_2"

gaze_df = (
    dataset.gaze[trial]
    .frame.select(
        pl.col("screen"),
        pl.col("pixel").list.get(0).alias("pixel_x"),
        pl.col("pixel").list.get(1).alias("pixel_y"),
    )
    .filter(pl.col("screen") == screen)
)

event_df = (
    dataset.events[trial]
    .frame.filter(pl.col("name") == "fixation")
    .select(
        pl.col("screen"),
        pl.col("duration"),
        pl.col("location").list.get(0).alias("pixel_x"),
        pl.col("location").list.get(1).alias("pixel_y"),
    )
    .filter(pl.col("screen") == screen)
)

fig, ax = plt.subplots()
stimulus_image = PIL.Image.open(
    f"../data/stimuli_MultiplEYE_HR_CH_Zurich_1_2025/stimuli_images_hr_ch_1/enc_wikimoon_id13_{screen}_hr.png"
)
ax.imshow(stimulus_image)
plt.plot(
    gaze_df["pixel_x"], gaze_df["pixel_y"], color="black", linewidth=0.5, alpha=0.3
)
for row in event_df.iter_rows(named=True):
    fixation = Circle(
        (row["pixel_x"], row["pixel_y"]),
        math.sqrt(row["duration"]),
        color="blue",
        fill=True,
        alpha=0.5,
        zorder=10,
    )
    ax.add_patch(fixation)
ax.set_xlim((0, experiment.screen.width_px))
ax.set_ylim((experiment.screen.height_px, 0))