# Prepping raw eye/head-tracking data recorded with the HL2.
**Data was recorded with the in house developed MR-OST-HMD application which simulates the HINTS exam.** <br>
*Note: in this case the data is from the same healthy subject, we do not intend to share patient data at this stage.*

### From raw recorded data to sorted data:
The raw data (txt) files saved by the Unity application will be transformed to multiple CSV files.<br>
*Per subject, one CSV file per part of the examination:* 
- (1) Freely looking around in the room.
- (2) Looking at the floor. 
- (3) Looking at the ceiling. 
- (4) Looking at the nose of the examiner (at +- 40 cm).
- (5) Looking Left of the examinar's nose (at +- 40 cm). 
- (6) Looking right of the examiner's nose (at +- 40 cm).
- (7) Looking forward.
- (8) During the HIT test while focussing at a black dot on the while at +- 40 cm.

The CSV files are also grouped per class (symptoms diagnosed during conventional clinical exam). <br>
*Please note that this depending on the folder structure.* <br>
*Not adhering to our structure will result in all subjects stored in the 'Other' folder*

Please also note that since this is for a public Github the data is taken from a single healthy subject. <br>
In other words, even though we have seven classes, in this case it's the same healthy subject within each class. <br>
The main goal is to show transparency within the data processing and improve reproducibility.

### Processed data to features and labels
The processed data will not be transformed to features and labels in the *'hints_features_labels'* folder.<br>
In the *'hints_features_labels'* folder, there will be subfolders for each part of the exam, see above. <br>
In each subfolder, you will find the *'features'* and *'labels'* folder with all the subjects ordered from 0, 1, ..., n. <br> 

**The labels indicate the following classes:**
- 0 -> Healthy
- 1 -> Skew
- 2 -> Saccades
- 3 -> NystagmusLeft
- 4 -> NystagmusRight
- 5 -> NystagmusDownbeating 
- 6 -> Other

*Names for reference: ['Healthy', 'Skew', 'Saccades', 'NystagmusLeft', 'NystagmusRight', 'NystagmusDownbeating', 'Other']*<br>

## order of cells
1. Find the paths of the most up to data raw data (.txt) file per subject. <br><br>
2. Create a main data folder (*'data/csv_per_patient_per_measurement_action'*) and subfolders per class with a subfolder per patient. <br>
Then create a .csv file containing the relevant parts of the raw data for each part of the examination; <br><br>
3. Create a features and labels folder (*'data/hints_features_labels'*) with subfolders for each part of the examination. <br>
Inside the subfolders create a *'features'* and *'lables'* folder containing clean and sorted data of the eye-and-head-tracking measurements. <br><br>

*Note that the application was made in Unity.*<br>
*The extended eye tracking library for the HL2 is used for eye-tracking measurements per eye,*<br>
*and the Transform information of the headset for the head measurements.*


In [None]:
'''Get the most recent .txt files from each patient/subject (i.e. per directory)
The HL2 unity application can save multiple .txt files per patient/subject.
Therefore we only keep the most recent one, which includes the most tracking data, and has the most recent timestamp.
'''

# Standard library imports
import datetime
import os
import re
from typing import Dict, List, Tuple

# extract path of the most recent .txt files from each patient/subject directory
def extract_latest_txt_files(
    data_directory: str,
    classes: List[str]
) -> Tuple[Dict[str, List[str]], int]:
    """Extract paths of the most recent .txt files from each class directory.

    Walks through the directory tree to find the most recent .txt file in each
    subdirectory, then organizes them by class based on the directory structure.

    Args:
        data_directory: Root directory containing the raw data files
        classes: List of class names to organize files into

    Returns:
        A tuple containing:
            - Dictionary mapping class names to lists of file paths
            - Total count of .txt files found
    """
    txt_file_paths = []
    total_txt_files = 0

    # Walk through directory tree
    for root, _, filenames in os.walk(data_directory):
        newest_file = None
        newest_time = None

        # Find newest .txt file in current directory
        for filename in filenames:
            if not filename.endswith('.txt'):
                continue

            total_txt_files += 1
            file_path = os.path.join(root, filename)

            # Parse timestamp from filename 
            # (format: 2024_01_24_09_32_48_longmessage.txt)
            timestamp_match = re.search(
                r'\d{4}_\d{2}_\d{2}_\d{2}_\d{2}_\d{2}',
                filename
            )
            if not timestamp_match:
                continue

            timestamp = datetime.datetime.strptime(
                timestamp_match.group(),
                '%Y_%m_%d_%H_%M_%S'
            )
            if newest_time is None or timestamp > newest_time:
                newest_file = file_path
                newest_time = timestamp

        if newest_file:
            txt_file_paths.append(newest_file)

    # Organize files by class
    txt_paths = {folder_name: [] for folder_name in classes}
    
    for file_path in txt_file_paths:
        # Extract class name from directory structure
        folder_name = os.path.basename(
            os.path.dirname(os.path.dirname(file_path))
        )
        if folder_name in classes:
            txt_paths[folder_name].append(file_path)
        else:
            txt_paths['Other'].append(file_path)

    return txt_paths, total_txt_files


# Define supported classes
CLASSES = [
    'Healthy',
    'Skew',
    'Saccades',
    'NystagmusLeft',
    'NystagmusRight',
    'NystagmusDownbeating',
    'Other'
]

# Set data directory path
DATA_DIRECTORY = '../RawData'

# Extract file paths and get statistics
txt_paths, total_txt_files = extract_latest_txt_files(DATA_DIRECTORY, CLASSES)

# Print summary statistics
print('Total txt files found:', total_txt_files)
print(
    'Total of unique newest txt files found: ',
    len(sum(txt_paths.values(), [])),
    txt_paths.values()
)

for class_name in CLASSES:
    print(f'{class_name}:', len(txt_paths[class_name]), txt_paths[class_name])

In [None]:
''' Create an output folder 'csv_per_patient_per_measurement_action', that has:
- Structure 'classes = ['Healthy', 'Skew', 'Saccades', 'NystagmusLeft', 'NystagmusRight', 'NystagmusDownbeating', 'Other']'
- Folders per subject/patient inside classes folder.
- .csv file per 'measurement_actions = ['beforeStart', 'startRecording', 'raum', 'still', 'nase', 'links', 'rechts', 'decke', 'bodem', 'blingHeadTest']'
- .csv file has headers which are the different features 'timestamp, worldLeftEyeCoordinates, headPosition, ...'
Use the paths of latest .txt file per patient 'txt_paths' from previous block.
'''

# Standard library imports
import os
from datetime import datetime
import re
from pathlib import Path
from typing import List, Tuple, Any

# Third-party imports
import pandas as pd
from tqdm import tqdm


def convert_subject_data_to_csv(
    category: str,
    path_measurements: str,
    output_base_path: str
) -> None:
    """Convert raw subject data from txt to multiple CSV files.

    Processes a single subject's raw data file and splits it into multiple CSV
    files, one for each measurement action (e.g., raum, still, nase, etc.).
    Handles coordinate data in various formats and saves them in a structured
    directory hierarchy.

    Args:
        category: Subject category (e.g., 'Healthy', 'Skew', etc.)
        path_measurements: Path to the raw data txt file
        output_base_path: Base path where processed CSV files will be saved

    Directory structure created:
        output_base_path/
            category/
                patient_id/
                    measurement_action.csv
    """
    def _create_output_dirs(base_path: str, category: str,
                          patient_id: str) -> str:
        """Create necessary output directories if they don't exist."""
        paths = [
            base_path,
            os.path.join(base_path, category),
            os.path.join(base_path, category, patient_id)
        ]
        for path in paths:
            os.makedirs(path, exist_ok=True)
        return paths[-1]

    def _parse_timestamp(timestamp_str: str) -> datetime | None:
        """Parse timestamp string to datetime object."""
        if any(char.isalpha() for char in timestamp_str):
            return None
        return datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S.%f')

    def _process_coordinate_data(matches: List[str]) -> List[Any]:
        """Process coordinate data from regex matches."""
        values = []
        for i, match in enumerate(matches):
            parts = match.split(', ')
            # Last group is quaternion (4 values), others are coordinates (3 values)
            group_size = 4 if i == len(matches) - 1 else 3
            
            for j in range(0, len(parts), group_size):
                group = parts[j:j + group_size]
                if 'NotAvailable' in group:
                    values.append('NotAvailable')
                else:
                    values.append(tuple(float(part) for part in group))
        return values

    # Define measurement actions in order
    MEASUREMENT_ACTIONS = [
        'beforeStart',
        'startRecording',
        'raum',
        'still',
        'nase',
        'links',
        'rechts',
        'decke',
        'bodem',
        'blingHeadTest'
    ]

    # Setup output directory structure
    patient_id = os.path.basename(os.path.dirname(path_measurements))
    output_dir = _create_output_dirs(output_base_path, category, patient_id)

    # Read and process the raw data
    df = pd.read_csv(path_measurements, delimiter="\t")
    headers = df.columns[0].split(', ')

    prepared_data = []
    measurement_type_counter = 0

    # Process each row in the dataframe
    for _, row in tqdm(df.iterrows(), total=len(df)):
        row_data = row.tolist()[0]  # Get first column which contains all data
        timestamp_str = row_data[:23]
        timestamp = _parse_timestamp(timestamp_str)

        # Save current measurement and start new one if timestamp is None
        if timestamp is None and prepared_data:
            measurement_df = pd.DataFrame(prepared_data, columns=headers)
            try:
                output_file = os.path.join(
                    output_dir,
                    f'{MEASUREMENT_ACTIONS[measurement_type_counter]}.csv'
                )
            except IndexError:
                output_file = os.path.join(
                    output_dir,
                    f'_extraSave_{measurement_type_counter}.csv'
                )
            measurement_df.to_csv(output_file, index=False)
            prepared_data = []
            measurement_type_counter += 1
            continue

        # Process coordinate data
        if timestamp is not None:
            matches = re.findall(r'\((.*?)\)', row_data)
            values = [timestamp] + _process_coordinate_data(matches)
            prepared_data.append(values)


def process_all_subjects(
    txt_paths: dict,
    output_base_path: str
) -> None:
    """Process all subjects' data files.

    Args:
        txt_paths: Dictionary mapping categories to lists of file paths
        output_base_path: Base path for output CSV files
    """
    for class_name, paths in txt_paths.items():
        print(f'Starting converting for group: {class_name}')
        
        for txt_path in paths:
            convert_subject_data_to_csv(class_name, txt_path, output_base_path)
        
        output_category_path = os.path.join(output_base_path, class_name)
        print(f'Finished converting for group: {class_name}')
        print(f'Saved in: {output_category_path}\n')


# Example usage:
if __name__ == "__main__":
    OUTPUT_PATH = Path(r'../data/csv_per_patient_per_measurement_action')
    process_all_subjects(txt_paths, str(OUTPUT_PATH))

In [None]:
"""Process eye-tracking data and organize it into features and labels for ML.

This module handles the conversion of raw CSV data into structured features and
labels, organizing them by measurement action type and splitting coordinate data
into separate components.

measurement_actions = ['beforeStart', 'startRecording', 'raum', 'still', 'nase', 'links', 'rechts', 'decke', 'bodem', 'blingHeadTest']
"""

# Standard library imports
import os
from pathlib import Path
from typing import Dict, List, Optional

# Third-party imports
import numpy as np
import pandas as pd
from tqdm import tqdm


class DataProcessor:
    """Process and organize eye-tracking data for machine learning."""

    # Class-level constants with explicit label mapping
    CLASS_MAPPING = {
        'Healthy': 0,
        'Skew': 1,
        'Saccades': 2,
        'NystagmusLeft': 3,
        'NystagmusRight': 4,
        'NystagmusDownbeating': 5,
        'Other': 6
    }

    MEASUREMENT_ACTIONS = [
        'raum',
        'still',
        'nase',
        'links',
        'rechts',
        'decke',
        'bodem',
        'blingHeadTest'
    ]

    XYZ_FEATURES = [
        'worldLeftEyePosition',
        'worldRightEyePosition',
        'worldLeftEyeDirection',
        'worldRightEyeDirection',
        'worldCombinedEyePosition',
        'worldCombinedEyeDirection',
        'cameraLeftEyePosition',
        'cameraRightEyePosition',
        'cameraLeftEyeDirection',
        'cameraRightEyeDirection',
        'cameraCombinedEyePosition',
        'cameraCombinedEyeDirection',
        'headPosition',
        'headEulerAngles'
    ]

    QUATERNION_FEATURES = ['headQuaternion ']

    def __init__(
        self,
        input_dir: str,
        output_dir: str
    ) -> None:
        """Initialize the data processor.

        Args:
            input_dir: Path to directory containing input CSV files
            output_dir: Path to directory where processed files will be saved
        """
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.measurement_counter = {
            action: 0 for action in self.MEASUREMENT_ACTIONS
        }
        
        # Verify input directory structure
        self._verify_input_structure()

    def _verify_input_structure(self) -> None:
        """Verify that the input directory structure matches expected classes."""
        if not self.input_dir.exists():
            raise ValueError(f"Input directory does not exist: {self.input_dir}")

        found_classes = {
            path.name for path in self.input_dir.iterdir() 
            if path.is_dir()
        }
        expected_classes = set(self.CLASS_MAPPING.keys())
        
        missing_classes = expected_classes - found_classes
        unexpected_classes = found_classes - expected_classes
        
        if missing_classes:
            raise ValueError(
                f"Missing expected class directories: {missing_classes}"
            )
        if unexpected_classes:
            raise ValueError(
                f"Found unexpected class directories: {unexpected_classes}"
            )

    def setup_directory_structure(self) -> None:
        """Create the output directory structure for features and labels."""
        for action in self.MEASUREMENT_ACTIONS:
            action_path = self.output_dir / action
            features_path = action_path / 'features'
            labels_path = action_path / 'labels'

            for path in [action_path, features_path, labels_path]:
                path.mkdir(parents=True, exist_ok=True)

    def process_coordinates(
        self,
        df: pd.DataFrame,
        feature: str,
        components: int
    ) -> pd.DataFrame:
        """Process coordinate data from string format to separate columns.

        Args:
            df: Input DataFrame containing the feature
            feature: Name of the feature to process
            components: Number of components (3 for xyz, 4 for quaternions)

        Returns:
            DataFrame with processed coordinate columns
        """
        try:
            # Split the string and convert to numeric values
            split_df = df[feature].str.strip('()').str.split(', ', expand=True)
            split_df = split_df.apply(pd.to_numeric, errors='coerce')

            # Handle missing columns
            if split_df.shape[1] < components:
                for i in range(components - split_df.shape[1]):
                    split_df[split_df.shape[1] + i] = np.nan

            # Assign new columns and drop original
            suffixes = ['w', 'x', 'y', 'z'][:components] if components == 4 else ['x', 'y', 'z']
            for i, suffix in enumerate(suffixes):
                df[f'{feature}_{suffix}'] = split_df[i]
            df.drop(feature, axis=1, inplace=True)

        except Exception as e:
            print(f"Error processing feature {feature}: {str(e)}")
            # Create NaN columns if processing fails
            suffixes = ['w', 'x', 'y', 'z'][:components] if components == 4 else ['x', 'y', 'z']
            for suffix in suffixes:
                df[f'{feature}_{suffix}'] = np.nan
            if feature in df.columns:
                df.drop(feature, axis=1, inplace=True)

        return df

    def process_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
        """Process the DataFrame by converting timestamps and coordinates.

        Args:
            df: Input DataFrame to process

        Returns:
            Processed DataFrame with converted timestamps and split coordinates
        """
        # Convert timestamp and calculate time differences
        df['timestamp'] = pd.to_datetime(
            df['timestamp'],
            format='%Y-%m-%d %H:%M:%S.%f'
        )
        df['delta_time'] = df['timestamp'].diff().dt.total_seconds().fillna(0)

        # Reorder columns
        df = df.reindex(columns=['timestamp', 'delta_time'] + [
            col for col in df.columns
            if col not in ['timestamp', 'delta_time']
        ])

        # Process xyz and quaternion features
        for feature in self.XYZ_FEATURES:
            if feature in df.columns:
                df = self.process_coordinates(df, feature, 3)
        for feature in self.QUATERNION_FEATURES:
            if feature in df.columns:
                df = self.process_coordinates(df, feature, 4)

        return df

    def process_files(self) -> None:
        """Process all CSV files and organize them into features and labels."""
        self.setup_directory_structure()
        print("Starting data processing...")

        # Track processed files for verification
        processed_files = {action: [] for action in self.MEASUREMENT_ACTIONS}

        # Walk through the input directory structure
        for class_path in self.input_dir.iterdir():
            if not class_path.is_dir() or class_path.name not in self.CLASS_MAPPING:
                continue

            class_name = class_path.name
            label = self.CLASS_MAPPING[class_name]  # Use explicit mapping
            print(f"\nProcessing class: {class_name} (Label: {label})")

            # Process each patient directory
            for patient_path in class_path.iterdir():
                if not patient_path.is_dir():
                    continue

                print(f"Processing patient: {patient_path.name}")

                # Process each measurement file
                for measurement in self.MEASUREMENT_ACTIONS:
                    csv_path = patient_path / f"{measurement}.csv"
                    if not csv_path.exists():
                        continue

                    try:
                        # Read and process the file
                        df = pd.read_csv(csv_path)
                        df_processed = self.process_dataframe(df)

                        # Save processed data
                        counter = self.measurement_counter[measurement]
                        features_path = (
                            self.output_dir / measurement / 'features' / f'{counter}.csv'
                        )
                        labels_path = (
                            self.output_dir / measurement / 'labels' / f'{counter}.csv'
                        )

                        df_processed.to_csv(features_path, index=False)
                        labels_path.write_text(str(label))

                        # Track processed file
                        processed_files[measurement].append({
                            'source_path': csv_path,
                            'class': class_name,
                            'label': label,
                            'output_index': counter
                        })

                        self.measurement_counter[measurement] += 1
                        print(f"Processed: {measurement} (Label: {label})")

                    except Exception as e:
                        print(f"Error processing {csv_path}: {str(e)}")

        # Print verification summary
        print("\nProcessing verification:")
        for measurement, files in processed_files.items():
            print(f"\n{measurement}:")
            for file in files:
                print(f"  {file['source_path'].parent.parent.name} "
                      f"(Label: {file['label']}) -> {file['output_index']}.csv")


def main():
    """Main function to run the data processing pipeline."""
    # Define input and output directories
    input_dir = r'../data/csv_per_patient_per_measurement_action'
    output_dir = r'../data/hints_features_labels'

    # Initialize and run processor
    processor = DataProcessor(input_dir, output_dir)
    processor.process_files()

    # Print summary
    print("\nProcessing complete!")
    print("Files processed per measurement type:")
    for action, count in processor.measurement_counter.items():
        print(f"{action}: {count}")


if __name__ == "__main__":
    main()