In [None]:
import os
import re
import numpy as np
import pandas as pd
from scipy.stats import skew, kurtosis, iqr
from config.config_loader import load_config

# Load configuration file for feature engineering
CONFIG_DATA = load_config(os.path.join("config", "feature_engineering_config.json"))


def read_txt(path):
    """
    Read driving behavior data from TXT file.
    Fields:
        timestamp, steering, throttle, brake
    Additional features:
        First and second derivatives (rate of change)
    """
    with open(path, 'r', encoding='utf-8') as f:
        contents = f.readlines()

    # Extract values from structured format
    datas = [re.findall(r'.*= (.*)  .*= (.*)  .*= (.*)  .*= (.*)', line)[0]
             for line in contents]
    
    datas = pd.DataFrame(datas, columns=['timestamp', 'steering', 'throttle', 'brake'],
                         dtype=float)

    # First and second derivatives of steering
    datas['steering_derivative'] = np.gradient(datas['steering'], datas['timestamp'])
    datas['steering_second_derivative'] = np.gradient(datas['steering_derivative'],
                                                      datas['timestamp'])

    # First and second derivatives of throttle
    datas['throttle_derivative'] = np.gradient(datas['throttle'], datas['timestamp'])
    datas['throttle_second_derivative'] = np.gradient(datas['throttle_derivative'],
                                                      datas['timestamp'])

    # First and second derivatives of brake
    datas['brake_derivative'] = np.gradient(datas['brake'], datas['timestamp'])
    datas['brake_second_derivative'] = np.gradient(datas['brake_derivative'],
                                                   datas['timestamp'])

    return datas


def read_speed_data(path, start_timestamp):
    """
    Read speed data from TXT file.
    Handles missing values such as 'null'.
    Computes first and second derivatives of speed.
    """
    with open(path, 'r', encoding='utf-8') as f:
        contents = f.readlines()

    speed_data = []
    for line in contents:
        match = re.findall(r'(.*) (.*)', line)
        if match:
            timestamp, speed = match[0]

            # Convert missing speed to nan
            speed = np.nan if speed == 'null' else int(speed)

            # Synchronize timestamp
            timestamp = int(timestamp) / 25 + start_timestamp

            speed_data.append((timestamp, speed))

    speed_data = pd.DataFrame(speed_data, columns=['timestamp', 'speed'])

    # First and second derivatives of speed
    speed_data['speed_derivative'] = np.gradient(speed_data['speed'],
                                                 speed_data['timestamp'])
    speed_data['speed_second_derivative'] = np.gradient(speed_data['speed_derivative'],
                                                        speed_data['timestamp'])

    return speed_data


def csv_reader(name, start_time):
    """
    Read eye-tracking data from CSV file.
    Selected features:
        - gaze point coordinates
        - head position (derived from eye positions)
        - pupil size
        - eye movement type
    
    Additional computed features:
        - gaze velocity and acceleration
        - head motion velocity and acceleration
    """
    columns_to_read = [
        'Recording timestamp',
        'Event', 'Recording date', 'Recording start time',
        'Validity left', 'Validity right',
        'Gaze point X', 'Gaze point Y',
        'Gaze point left X', 'Gaze point left Y',
        'Gaze point right X', 'Gaze point right Y',
        'Eye position left X (DACSmm)', 'Eye position left Y (DACSmm)', 'Eye position left Z (DACSmm)',
        'Eye position right X (DACSmm)', 'Eye position right Y (DACSmm)', 'Eye position right Z (DACSmm)',
        'Eye movement type', 'Gaze event duration',
        'Pupil diameter left', 'Pupil diameter right'
    ]

    df = pd.read_csv(name, usecols=columns_to_read, low_memory=False)

    # Select required columns and rename
    df_new = df[['Recording timestamp', 'Gaze point X', 'Gaze point Y',
                 'Eye position left X (DACSmm)', 'Eye position left Y (DACSmm)',
                 'Eye position left Z (DACSmm)', 'Eye position right X (DACSmm)',
                 'Eye position right Y (DACSmm)', 'Eye position right Z (DACSmm)']].copy()

    df_new.columns = ['timestamp', 'Gaze point X', 'Gaze point Y',
                      'Head left X', 'Head left Y', 'Head left Z',
                      'Head right X', 'Head right Y', 'Head right Z']

    # Convert timestamp to seconds and synchronize
    df_new['timestamp'] = df_new['timestamp'] / 1e6 + start_time

    # Gaze velocity and acceleration (X, Y)
    for axis in ['X', 'Y']:
        df_new[f'Gaze_{axis}_vel'] = np.gradient(df_new[f'Gaze point {axis}'],
                                                 df_new['timestamp'])
        df_new[f'Gaze_{axis}_acc'] = np.gradient(df_new[f'Gaze_{axis}_vel'],
                                                 df_new['timestamp'])

    # Eye movement speed and acceleration (euclidean)
    df_new['eye_speed'] = np.sqrt(df_new['Gaze point X'].diff()**2 +
                                  df_new['Gaze point Y'].diff()**2) / df_new['timestamp'].diff()

    df_new['eye_acceleration'] = np.gradient(df_new['eye_speed'], df_new['timestamp'])

    # Compute head center position
    df['Head X'] = df[['Eye position left X (DACSmm)', 'Eye position right X (DACSmm)']].mean(axis=1)
    df['Head Y'] = df[['Eye position left Y (DACSmm)', 'Eye position right Y (DACSmm)']].mean(axis=1)
    df['Head Z'] = df[['Eye position left Z (DACSmm)', 'Eye position right Z (DACSmm)']].mean(axis=1)

    # Head velocity and acceleration (X, Y, Z)
    for axis in ['X', 'Y', 'Z']:
        df_new[f'Head_vel_{axis}'] = np.gradient(df[f'Head {axis}'].values,
                                                 df_new['timestamp'])
        df_new[f'Head_acc_{axis}'] = np.gradient(df_new[f'Head_vel_{axis}'],
                                                 df_new['timestamp'])

    # Combined velocity and acceleration
    df_new['Head_vel_total'] = np.sqrt(df_new['Head_vel_X']**2 +
                                       df_new['Head_vel_Y']**2 +
                                       df_new['Head_vel_Z']**2)

    df_new['Head_acc_total'] = np.sqrt(df_new['Head_acc_X']**2 +
                                       df_new['Head_acc_Y']**2 +
                                       df_new['Head_acc_Z']**2)

    # Additional raw signals
    df_new['Pupil_diameter_left'] = df['Pupil diameter left']
    df_new['Pupil_diameter_right'] = df['Pupil diameter right']
    df_new['Eye movement type'] = df['Eye movement type']

    # Debug outlier detection for eye movement speed
    large_values = df_new[df_new['eye_speed'].abs() > 100000]
    if not large_values.empty:
        print("Detected extremely large eye movement speeds:")
        print(large_values[['eye_speed', 'timestamp']])

    return df_new


In [None]:
import numpy as np

def hampel_filter_forloop_numba(input_series, window_size, n_sigmas=3):
    """
    Hampel filter for outlier detection and replacement.
    Robust to non-Gaussian noise because it uses median and MAD.
    
    Args:
        input_series (np.ndarray): 1D signal array.
        window_size (int): Half window size for sliding median filter.
        n_sigmas (int or float): Threshold multiplier for MAD-based scale.
        
    Returns:
        new_series (np.ndarray): Filtered signal with outliers replaced.
        indices (list): Indices where outliers were detected.
    """
    n = len(input_series)
    new_series = input_series.copy()
    k = 1.4826  # scale factor converting MAD to std for Gaussian assumption
    indices = []

    # Loop through all valid center indices
    for i in range(window_size, (n - window_size)):
        # Compute the local median within the window
        x0 = np.nanmedian(input_series[(i - window_size):(i + window_size)])
        
        # Compute the Median Absolute Deviation (MAD) and scale to std estimate
        window_data = input_series[(i - window_size):(i + window_size)]
        S0 = k * np.nanmedian(np.abs(window_data - x0))
        
        # Check if the current point deviates more than n_sigmas * S0
        if np.abs(input_series[i] - x0) > n_sigmas * S0:
            # Replace detected outliers with the local median
            new_series[i] = x0
            indices.append(i)

    return new_series, indices

In [None]:
from pathlib import Path
import pandas as pd
import numpy as np

RAW_DATA_DIR = Path("raw_data")

# Columns to interpolate (numeric sensor signals)
VALUE_COLS_TO_INTERPOLATE = [
    "Gaze point left X", "Gaze point left Y",
    "Gaze point right X", "Gaze point right Y",
    "Eye position left X (DACSmm)", "Eye position left Y (DACSmm)", "Eye position left Z (DACSmm)",
    "Eye position right X (DACSmm)", "Eye position right Y (DACSmm)", "Eye position right Z (DACSmm)",
    "Pupil diameter left", "Pupil diameter right"
]


def preprocess_and_interpolate_file(path: Path, save_processed: bool = False):
    """
    Data preprocessing pipeline for eye-tracking CSV files.

    Processing workflow:
    1. Remove rows before the event `ScreenRecordingStart`
    2. Rows with any non-empty event → set numeric value columns to NaN
    3. Rows where both eyes are marked as `Invalid` → set numeric value columns to NaN
    4. Convert value columns to numeric, forcing invalid strings to NaN
    5. Interpolate short missing gaps (≤ 5 samples) only
    6. Forward/backward fill remaining boundaries
    7. Apply Hampel filter to remove outliers
    8. Recompute the binocular gaze point `Gaze point X/Y` as average of left/right

    Returns:
        df (pd.DataFrame): Processed DataFrame
        loss_rate (float): Invalid data ratio before interpolation
        total_outlier_count (int): Number of values replaced by Hampel filter
        outlier_indices_by_column (dict): Outlier indices for each column
    """
    outlier_indices_by_column = {}

    df = pd.read_csv(path, dtype=str, low_memory=False)

    # Keep data only after `ScreenRecordingStart`
    calibration_start_idx = df[df["Event"] == "ScreenRecordingStart"].index[0]
    df = df.iloc[calibration_start_idx + 1:].copy()
    df.reset_index(drop=True, inplace=True)

    # Keep only rows produced by the eye tracker sensor
    df = df[df["Sensor"] == "Eye Tracker"].copy()
    df.reset_index(drop=True, inplace=True)

    total_count = len(df)

    # Non-empty event rows → considered unreliable → set as NaN
    event_nonempty_mask = df["Event"].notna() & (df["Event"].str.strip() != "")
    df.loc[event_nonempty_mask, VALUE_COLS_TO_INTERPOLATE] = np.nan

    # Both eyes invalid → treat measurements as missing
    invalid_mask = (df["Validity left"] == "Invalid") & (df["Validity right"] == "Invalid")
    df.loc[invalid_mask, VALUE_COLS_TO_INTERPOLATE] = np.nan

    invalid_count = event_nonempty_mask.sum() + invalid_mask.sum()
    loss_rate = invalid_count / total_count

    # Convert all value columns to numeric, coercing any errors into NaN
    for col in VALUE_COLS_TO_INTERPOLATE:
        df[col] = pd.to_numeric(df[col], errors="coerce")

    # Limit interpolation to short NaN gaps only
    max_gap = 5

    for col in VALUE_COLS_TO_INTERPOLATE:
        s = df[col]
        is_na = s.isna()

        # Identify consecutive NaN segments
        seg_id = (is_na != is_na.shift()).cumsum()
        groups = s.groupby(seg_id)

        new_s = s.copy()

        for _, g in groups:
            if g.isna().all():
                gap_len = len(g)
                # Interpolate only if gap is short enough
                if gap_len <= max_gap:
                    new_s[g.index] = s.interpolate(method="linear", limit_direction="both")[g.index]
                else:
                    new_s[g.index] = np.nan

        df[col] = new_s

    # Boundary fill to handle beginning and end NaNs
    df[VALUE_COLS_TO_INTERPOLATE] = (
        df[VALUE_COLS_TO_INTERPOLATE].ffill().bfill()
    )

    # Hampel filter for robust outlier detection
    total_outlier_count = 0
    for col in VALUE_COLS_TO_INTERPOLATE:
        series = df[col].to_numpy()
        filtered_series, indices = hampel_filter_forloop_numba(
            series, window_size=10, n_sigmas=3
        )
        df[col] = filtered_series
        total_outlier_count += len(indices)
        outlier_indices_by_column[col] = indices

    # Recompute gaze coordinates as binocular average
    df["Gaze point X"] = df[["Gaze point left X", "Gaze point right X"]].mean(axis=1)
    df["Gaze point Y"] = df[["Gaze point left Y", "Gaze point right Y"]].mean(axis=1)

    return df, loss_rate, total_outlier_count, outlier_indices_by_column


In [None]:
import json
from pathlib import Path
import pandas as pd
import numpy as np

# === Load configuration file ===
CONFIG_PATH = Path("config") / "feature_engineering_config.json"
with CONFIG_PATH.open(encoding="utf-8") as f:
    config = json.load(f)


def batch_process_csv_interpolation():
    """
    Batch processing pipeline:
    - Iterate through configured dataset groups and experiments
    - Apply preprocessing and interpolation function for each file
    - Save cleaned outputs into target directories
    """

    for group_key in ["GroupA_data_path", "GroupB_data_path"]:
        group_data = config.get(group_key, {})
        print(f"\n[INFO] Processing dataset group: {group_key}")

        for ex in ["ex1", "ex2"]:
            in_key = f"csv_path_{ex}"
            out_key = f"csv_path_{ex}_cleaned"

            input_list = group_data.get(in_key, [])
            output_list = group_data.get(out_key, [])

            print(f"[INFO] Experiment {ex}: {len(input_list)} files to process")

            for in_path_str, out_path_str in zip(input_list, output_list):
                if not in_path_str:
                    continue

                in_path = Path(in_path_str)
                out_path = Path(out_path_str)

                # Apply preprocessing
                df, loss_rate, n_outliers, outlier_indices_by_column = \
                    preprocess_and_interpolate_file(in_path)

                print("[INFO] Hampel filter applied")

                # Ensure output directory exists
                out_path.parent.mkdir(parents=True, exist_ok=True)

                # Save processed output
                df.to_csv(out_path, index=False)
                print(
                    f"[INFO] Saved cleaned file: {out_path} "
                    f"(invalid rate: {loss_rate:.2%}, outliers: {n_outliers})"
                )


if __name__ == "__main__":
    batch_process_csv_interpolation()


In [None]:
import os
import pandas as pd

# Root directory to save merged results
SAVE_ROOT = "data_pro1"
os.makedirs(SAVE_ROOT, exist_ok=True)

# Dataset groups and participant counts (keep original identifiers: NC / PD)
groups = {
    "NC": CONFIG_DATA["NC_number"],
    "PD": CONFIG_DATA["PD_number"]
}

# Experiment identifiers
ex_num_list = ["ex1", "ex2"]

# Sensor data modalities
sensor_types = ["pedal", "speed", "eyemovement"]


for group, person_count in groups.items():
    data_path_key = f"{group}_data_path"

    for ex_num in ex_num_list:
        for sensor in sensor_types:

            print(f"[INFO] Processing group={group}, experiment={ex_num}, sensor={sensor}")

            merged_dfs = []

            for person_idx in range(person_count):

                # Special-case skip condition in original dataset
                if group == "PD" and ex_num == "ex1" and person_idx == 8:
                    continue

                # Retrieve required paths and timestamps from config
                pedal_data_path = CONFIG_DATA[data_path_key][f"pedal_path_{ex_num}"][person_idx]
                speed_data_path = CONFIG_DATA[data_path_key][f"speed_path_{ex_num}"][person_idx]
                start_timestamp = CONFIG_DATA[data_path_key][f"video_start_timestamp_{ex_num}"][person_idx]
                em_csv_path = CONFIG_DATA[data_path_key][f"csv_path_{ex_num}_cleaned"][person_idx]

                # Load sensor data based on type
                if sensor == "pedal":
                    df = read_txt(pedal_data_path)
                elif sensor == "speed":
                    df = read_speed_data(speed_data_path, start_timestamp)
                elif sensor == "eyemovement":
                    df = csv_reader(em_csv_path, start_timestamp)

                # Assign participant ID (sequential index)
                df["person_id"] = person_idx + 1
                merged_dfs.append(df)

            # Merge all participants in the group
            result_df = pd.concat(merged_dfs, ignore_index=True)

            # File naming convention preserved
            save_filename = f"{group}_{sensor}_data_{ex_num}.csv"
            save_path = os.path.join(SAVE_ROOT, save_filename)

            # Persist output to disk
            result_df.to_csv(save_path, index=False)

            print(f"[INFO] Saved: {save_path}  (records={len(result_df)})")
