In [4]:
# # Eye Tracking Parameter Extraction using IVT Filter
#
# This notebook implements a minimal velocity-threshold identification (IVT) filter
# to segment gaze velocity data into fixations and saccades, then computes basic statistics.
#
# The code is intended as a starting point and minimal viable product (MVP) for eye tracking analysis.
# Feel free to adapt and improve it according to your needs.

import numpy as np

SPS = 50  # Samples per second (sampling rate)
DEFAULT_VELOCITY_THRESHOLD = 30  # degrees per second
MIN_FIXATION_DURATION_MS = 60
MIN_SACCADE_DURATION_MS = 20

def create_fixation_saccade_arrays(
    gaze_velocity: np.ndarray,
    velocity_threshold: float = DEFAULT_VELOCITY_THRESHOLD,
    min_fixation_duration_ms: int = MIN_FIXATION_DURATION_MS,
    min_saccade_duration_ms: int = MIN_SACCADE_DURATION_MS,
    sampling_rate_hz: int = SPS
) -> tuple[np.ndarray, np.ndarray]:
    """
    Classify each sample in gaze_velocity as fixation or saccade using an IVT filter.

    Parameters:
        gaze_velocity (np.ndarray): 1D array of gaze velocity values.
        velocity_threshold (float): Velocity threshold to distinguish fixations from saccades.
        min_fixation_duration_ms (int): Minimum fixation duration in ms.
        min_saccade_duration_ms (int): Minimum saccade duration in ms.
        sampling_rate_hz (int): Sampling rate of the data in Hz.

    Returns:
        fixation_array (np.ndarray): Binary array where 1 indicates fixation samples.
        saccade_array (np.ndarray): Binary array where 1 indicates saccade samples.
    """
    if gaze_velocity.ndim != 1:
        raise ValueError("gaze_velocity must be a 1D numpy array.")

    min_fixation_samples = int(min_fixation_duration_ms / (1000 / sampling_rate_hz))
    min_saccade_samples = int(min_saccade_duration_ms / (1000 / sampling_rate_hz))

    fixation_array = np.zeros(len(gaze_velocity), dtype=int)
    saccade_array = np.zeros(len(gaze_velocity), dtype=int)

    i = 0
    while i < len(gaze_velocity):
        if gaze_velocity[i] < velocity_threshold:
            fixation_start = i
            while i < len(gaze_velocity) and gaze_velocity[i] < velocity_threshold:
                i += 1
            fixation_end = i
            duration = fixation_end - fixation_start
            if duration >= min_fixation_samples:
                fixation_array[fixation_start:fixation_end] = 1
        else:
            saccade_start = i
            while i < len(gaze_velocity) and gaze_velocity[i] >= velocity_threshold:
                i += 1
            saccade_end = i
            duration = saccade_end - saccade_start
            if duration >= min_saccade_samples:
                saccade_array[saccade_start:saccade_end] = 1

    return fixation_array, saccade_array


def compute_statistics(
    fixation_array: np.ndarray,
    saccade_array: np.ndarray,
    gaze_velocity: np.ndarray,
    sampling_rate_hz: int = SPS
) -> tuple[int, float, float, int, float, float]:
    """
    Compute basic eye-tracking statistics from fixation and saccade arrays.

    Parameters:
        fixation_array (np.ndarray): Binary array indicating fixation samples.
        saccade_array (np.ndarray): Binary array indicating saccade samples.
        gaze_velocity (np.ndarray): Array of gaze velocity values.
        sampling_rate_hz (int): Sampling rate of the data (Hz).

    Returns:
        fixation_count (int): Number of detected fixations.
        avg_fixation_duration_ms (float): Average fixation duration in milliseconds.
        avg_fixation_velocity (float): Average velocity during fixations.
        saccade_count (int): Number of detected saccades.
        avg_saccade_duration_ms (float): Average saccade duration in milliseconds.
        avg_saccade_velocity (float): Average velocity during saccades.
    """
    if not (fixation_array.shape == saccade_array.shape == gaze_velocity.shape):
        raise ValueError("Input arrays must have the same shape.")

    fixation_duration_samples = fixation_array.sum()
    saccade_duration_samples = saccade_array.sum()

    # Count number of fixations and saccades (transitions from 0 to 1)
    fixation_count = np.count_nonzero(np.diff(np.concatenate(([0], fixation_array))) == 1)
    saccade_count = np.count_nonzero(np.diff(np.concatenate(([0], saccade_array))) == 1)

    avg_fixation_duration_ms = (
        (fixation_duration_samples / fixation_count) * (1000 / sampling_rate_hz)
        if fixation_count > 0 else 0.0
    )
    avg_saccade_duration_ms = (
        (saccade_duration_samples / saccade_count) * (1000 / sampling_rate_hz)
        if saccade_count > 0 else 0.0
    )

    fixation_velocities = gaze_velocity[fixation_array == 1]
    avg_fixation_velocity = fixation_velocities.mean() if fixation_velocities.size > 0 else 0.0

    saccade_velocities = gaze_velocity[saccade_array == 1]
    avg_saccade_velocity = saccade_velocities.mean() if saccade_velocities.size > 0 else 0.0

    return (
        fixation_count,
        avg_fixation_duration_ms,
        avg_fixation_velocity,
        saccade_count,
        avg_saccade_duration_ms,
        avg_saccade_velocity
    )

# Example: dummy gaze velocity data (degrees per second)
example_velocity = np.array([5, 8, 12, 35, 40, 38, 100, 89, 48, 50, 52, 8, 10, 22, 11])

# Run IVT segmentation
fix_array, sac_array = create_fixation_saccade_arrays(example_velocity)

# Compute statistics
stats = compute_statistics(fix_array, sac_array, example_velocity)

print("Fixation array:", fix_array)
print("Saccade array:", sac_array)
print(
    "Results (fixation_count, avg_fixation_duration_ms, avg_fixation_velocity, "
    "saccade_count, avg_saccade_duration_ms, avg_saccade_velocity):"
)
print(stats)

Fixation array: [1 1 1 0 0 0 0 0 0 0 0 1 1 1 1]
Saccade array: [0 0 0 1 1 1 1 1 1 1 1 0 0 0 0]
Results (fixation_count, avg_fixation_duration_ms, avg_fixation_velocity, saccade_count, avg_saccade_duration_ms, avg_saccade_velocity):
(2, np.float64(70.0), np.float64(10.857142857142858), 1, np.float64(160.0), np.float64(56.5))
