<a href="https://colab.research.google.com/github/antonis00/EKPA/blob/main/GazebaseVRWithCorrectUserIdentification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pymovements
import pymovements as pm
dataset = pm.Dataset("GazeBaseVR", path='data/GazeBaseVR')
dataset.download()
dataset.load()

Collecting pymovements
  Downloading pymovements-0.18.0-py3-none-any.whl.metadata (7.9 kB)
Collecting matplotlib<3.9,>=3.8.0 (from pymovements)
  Downloading matplotlib-3.8.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.8 kB)
Downloading pymovements-0.18.0-py3-none-any.whl (164 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m164.4/164.4 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading matplotlib-3.8.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.6/11.6 MB[0m [31m102.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: matplotlib, pymovements
  Attempting uninstall: matplotlib
    Found existing installation: matplotlib 3.7.1
    Uninstalling matplotlib-3.7.1:
      Successfully uninstalled matplotlib-3.7.1
Successfully installed matplotlib-3.8.4 pymovements-0.18.0


Downloading https://figshare.com/ndownloader/files/38844024 to data/GazeBaseVR/downloads/gazebasevr.zip


gazebasevr.zip: 0.00B [00:00, ?B/s]

Checking integrity of gazebasevr.zip
Extracting gazebasevr.zip to data/GazeBaseVR/raw


  0%|          | 0/5020 [00:00<?, ?it/s]

<pymovements.dataset.dataset.Dataset at 0x7e3920a0bd90>

# Eye Movement-Based User Identification

## Introduction

This notebook demonstrates a machine learning approach to identify users based on their eye movement patterns across different tasks. We use data collected from participants performing five distinct eye-tracking (ET) tasks:

1. Vergence task (VRG)
2. Horizontal smooth pursuit task (PUR)
3. Video-viewing task (VID)
4. Self-paced reading task (TEX)
5. Random oblique saccade task (RAN)

## Data Processing and Feature Extraction

We extract a comprehensive set of features from the eye-tracking data for each task. The features are calculated both globally and for multiple time-based portions of each task.

### Velocity Calculation

Before feature extraction, we calculate the velocity of eye movements using the Savitzky-Golay filter:
 dataset.pos2vel(method='savitzky_golay', degree=2, window_length=7)
*italicised text*
 This method applies a Savitzky-Golay filter to smooth the position data and calculate velocities. It uses a polynomial of degree 2 and a window length of 7 samples, which helps to reduce noise while preserving the underlying signal characteristics.

### Extracted Features

For each portion and globally:

1. **Velocity Statistics**:
   - Mean velocity
   - Standard deviation of velocity
   - Maximum velocity
   - Skewness of velocity distribution
   - Kurtosis of velocity distribution

2. **Position Statistics**:
   - Mean X and Y positions
   - Standard deviation of X and Y positions

3. **Target Position** (if available):
   - Mean X and Y target positions
   - Standard deviation of X and Y target positions

4. **Saccade and Fixation Metrics**:
   - Saccade rate (proportion of samples above saccade threshold)
   - Fixation rate (proportion of samples below fixation threshold)

5. **Task Duration**:
   - Total number of samples in the task

## Model Architecture

We use a Multi-Layer Perceptron (MLP) classifier with the following structure:
- Input layer (size depends on the number of features)
- Hidden layers: 64 units, 32 units, 16 units
- Output layer (size equals the number of unique users)
- A skip connection from input to output for improved learning

## Training Process

The model is trained using:
- Adam optimizer with a learning rate of 0.05
- Cross-entropy loss function
- 200 epochs
- Batch size of 256

## Results

The overall test accuracy achieved is 68%, indicating that the model can correctly identify users based on their eye movements in 68% of cases.

### Task-Specific Accuracies

1. Video-viewing task (VID): 93.03%
2. Random oblique saccade task (RAN): 72.11%
3. Horizontal smooth pursuit task (PUR): 68.33%
4. Self-paced reading task (TEX): 57.37%
5. Vergence task (VRG): 43.82%

## Analysis

The results show that different tasks have varying levels of effectiveness in identifying users:

1. The video-viewing task is the most effective, with an impressive 93.03% accuracy. This suggests that how people watch videos is highly individual and consistent.

2. The random oblique saccade task and horizontal smooth pursuit task show moderate effectiveness, with accuracies above the overall average.

3. The self-paced reading task and vergence task are less effective for user identification, with accuracies below the overall average.

These findings indicate that complex, naturalistic tasks like video viewing may be more suitable for eye movement-based user identification compared to simpler, controlled tasks like vergence exercises.

## Conclusion

This study demonstrates the potential of using eye movement patterns for user identification, with varying degrees of success across different tasks. The use of the Savitzky-Golay filter for velocity calculation provides a robust basis for feature extraction. Future work could focus on optimizing feature extraction for the most effective tasks or combining data from multiple tasks to improve overall accuracy.



In [2]:
position_columns = ['x_left', 'y_left', 'x_right', 'y_right']

# Calculate velocity
dataset.pos2vel( method='savitzky_golay', degree=2, window_length=7)
print(dataset.gaze[5].frame)

  0%|          | 0/5020 [00:00<?, ?it/s]

shape: (14_815, 16)
┌────────────┬────────────┬───────────┬────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ time       ┆ x_target_p ┆ y_target_ ┆ zT     ┆ … ┆ session_i ┆ task_name ┆ position  ┆ velocity  │
│ ---        ┆ os         ┆ pos       ┆ ---    ┆   ┆ d         ┆ ---       ┆ ---       ┆ ---       │
│ f32        ┆ ---        ┆ ---       ┆ f32    ┆   ┆ ---       ┆ str       ┆ list[f32] ┆ list[f32] │
│            ┆ f32        ┆ f32       ┆        ┆   ┆ i64       ┆           ┆           ┆           │
╞════════════╪════════════╪═══════════╪════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ 0.0        ┆ 0.0        ┆ 0.0       ┆ 0.4433 ┆ … ┆ 2         ┆ 1_VRG     ┆ [4.9966,  ┆ [5.709814 │
│            ┆            ┆           ┆        ┆   ┆           ┆           ┆ 0.9662, … ┆ , -2.6580 │
│            ┆            ┆           ┆        ┆   ┆           ┆           ┆ 0.7913]   ┆ 34, …     │
│            ┆            ┆           ┆        ┆   ┆           ┆       

In [None]:
import numpy as np
from scipy import stats
import polars as pl
from tqdm import tqdm

def extract_features(gaze_data, n_portions=10, set_type='train'):
    features = []
    y = []
    task_names = []

    for gaze in tqdm(gaze_data):
        frame = gaze.frame
        # Create a temporary index column
        frame = frame.with_row_count("temp_index")

        if set_type == 'train':
            frame = frame.filter(pl.col('temp_index') >0)
        elif set_type == 'test':
            frame = frame.filter(pl.col('temp_index') >0)

        # Drop the temporary index column
        frame = frame.drop('temp_index')

        total_samples = frame.shape[0]
        portion_size = total_samples // n_portions

        # Initialize lists to store portion-wise statistics
        portion_features = []

        # Calculate statistics for each portion
        for i in range(n_portions):
            start_idx = i * portion_size
            end_idx = (i + 1) * portion_size if i < n_portions - 1 else total_samples

            portion = frame.slice(start_idx, end_idx - start_idx)

            # Velocity statistics
            velocities = portion['velocity'].apply(lambda x: np.linalg.norm(x)).to_numpy()

            # Position statistics
            positions = np.vstack(portion['position'].to_numpy())

            portion_stats = [
                np.mean(velocities),
                np.std(velocities),
                np.max(velocities),
                *np.mean(positions, axis=0),
                *np.std(positions, axis=0),
                stats.skew(velocities),
                stats.kurtosis(velocities)
            ]

            # Add any other relevant statistics from other columns
            if 'x_target_pos' in portion.columns and 'y_target_pos' in portion.columns:
                portion_stats.extend([
                    portion['x_target_pos'].mean(),
                    portion['y_target_pos'].mean(),
                    portion['x_target_pos'].std(),
                    portion['y_target_pos'].std()
                ])

            portion_features.extend(portion_stats)

        # Global features
        velocities = frame['velocity'].apply(lambda x: np.linalg.norm(x)).to_numpy()
        positions = np.vstack(frame['position'].to_numpy())

        global_features = [
            np.mean(velocities),
            np.std(velocities),
            np.max(velocities),
            *np.mean(positions, axis=0),
            *np.std(positions, axis=0),
            stats.skew(velocities),
            stats.kurtosis(velocities)
        ]

        # Saccade and fixation features
        saccade_threshold = np.mean(velocities) + 2 * np.std(velocities)
        fixation_threshold = np.mean(velocities) - np.std(velocities)
        saccade_rate = np.sum(velocities > saccade_threshold) / total_samples
        fixation_rate = np.sum(velocities < fixation_threshold) / total_samples

        # Task-specific features
        task_name = frame['task_name'][0]
        task_duration = total_samples  # Assuming constant sampling rate

        # Combine all features
        feature_vector = portion_features + global_features + [saccade_rate, fixation_rate, task_duration]

        features.append(feature_vector)
        y.append(frame['subject_id'][0])
        task_names.append(task_name)

    return np.array(features), np.array(y), np.array(task_names)

# Extract features for training set (1st and 2nd rows)
X_train, y_train, task_names_train = extract_features(dataset.gaze, n_portions=200, set_type='train')

# Extract features for test set (every 3rd row)
X_test, y_test, task_names_test = extract_features(dataset.gaze, n_portions=200, set_type='test')

# Save the features
np.save('X_train.npy', X_train)
np.save('y_train.npy', y_train)
np.save('task_names_train.npy', task_names_train)

np.save('X_test.npy', X_test)
np.save('y_test.npy', y_test)
np.save('task_names_test.npy', task_names_test)

print(f"Training set shape: {X_train.shape}")
print(f"Testing set shape: {X_test.shape}")

  velocities = portion['velocity'].apply(lambda x: np.linalg.norm(x)).to_numpy()
  velocities = frame['velocity'].apply(lambda x: np.linalg.norm(x)).to_numpy()
 50%|█████     | 2519/5020 [53:02<44:13,  1.06s/it]

In [None]:
import os
from collections import defaultdict

import numpy as np
import torch
from sklearn.metrics import accuracy_score
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm


class EyeMovementDataset(Dataset):
    def __init__(self, X, y, user_to_index):
        self.X = torch.FloatTensor(X)
        self.y = torch.LongTensor([user_to_index[user] for user in y])

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]


class MLPClassifier(nn.Module):
    def __init__(self, input_size, num_classes):
        super(MLPClassifier, self).__init__()
        self.fc1 = nn.Linear(input_size, 64)
        self.fc2 = nn.Linear(64, 16)
        self.fc5 = nn.Linear(16, num_classes)
        self.fc6 = nn.Linear(input_size, num_classes)

    def forward(self, x):
        out = torch.relu(self.fc1(x))
        out = torch.relu(self.fc2(out))
        out = self.fc5(out)
        d = self.fc6(x)
        return out + 10 * d


def evaluate_task_accuracy(
    model,
    X_test,
    y_test,
    task_names_test,
    user_to_index,
    index_to_user,
    specific_task=None,
):
    model.eval()
    device = next(model.parameters()).device

    X_test_tensor = torch.FloatTensor(X_test).to(device)
    y_test_tensor = torch.LongTensor([user_to_index[user] for user in y_test]).to(
        device
    )

    with torch.no_grad():
        outputs = model(X_test_tensor)
        probabilities = torch.softmax(outputs, dim=1)
        _, predicted = torch.max(outputs.data, 1)

    predicted = predicted.cpu().numpy()
    probabilities = probabilities.cpu().numpy()

    task_results = defaultdict(lambda: {"true": [], "pred": [], "prob": []})
    for true, pred, prob, task in zip(
        y_test, predicted, probabilities, task_names_test
    ):
        task_results[task]["true"].append(true)
        task_results[task]["pred"].append(index_to_user[pred])
        task_results[task]["prob"].append(prob[user_to_index[true]])

    task_accuracies = {}
    for task, results in task_results.items():
        if specific_task is None or task == specific_task:
            accuracy = accuracy_score(results["true"], results["pred"])
            avg_probability = np.mean(results["prob"])
            task_accuracies[task] = {
                "accuracy": accuracy,
                "avg_probability": avg_probability,
            }

    if specific_task:
        if specific_task in task_accuracies:
            print(f"Task: {specific_task}")
            # print(f"Accuracy: {task_accuracies[specific_task]['accuracy']:.2%}")
            # print(
            #     f"Average Probability: {task_accuracies[specific_task]['avg_probability']:.4f}"
            # )
        else:
            print(f"Task {specific_task} not found in the test set.")
    else:
        sorted_tasks = sorted(
            task_accuracies.items(), key=lambda x: x[1]["accuracy"], reverse=True
        )
        print("Task Accuracies and Average Probabilities:")
        for task, metrics in sorted_tasks:
            print(
                f"{task}: Accuracy: {metrics['accuracy']:.2%}, Avg Probability: {metrics['avg_probability']:.4f}"
            )

    return task_accuracies


# Load the data
X_train = np.load("X_train.npy", allow_pickle=True).astype(np.float32)
y_train = np.load("y_train.npy", allow_pickle=True)
X_test = np.load("X_test.npy", allow_pickle=True).astype(np.float32)
y_test = np.load("y_test.npy", allow_pickle=True)
task_names_test = np.load("task_names_test.npy", allow_pickle=True)

# Get unique task names
unique_task_names = np.unique(task_names_test)
# print("Available tasks:", unique_task_names)

# Create user ID to index mapping
unique_users = np.unique(np.concatenate([y_train, y_test]))
user_to_index = {user: idx for idx, user in enumerate(unique_users)}
index_to_user = {idx: user for user, idx in user_to_index.items()}

# Check for NaN or infinite values
X_train = np.nan_to_num(X_train, nan=0, posinf=1e6, neginf=-1e6)
X_test = np.nan_to_num(X_test, nan=0, posinf=1e6, neginf=-1e6)

# Initialize model
input_size = X_train.shape[1]
num_classes = len(unique_users)
model = MLPClassifier(input_size, num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Check if a trained model exists
model_path = "model.pth"
if os.path.exists(model_path):
    print("Loading existing model...")
    model.load_state_dict(torch.load(model_path))
else:
    print("Training new model...")
    # Create datasets and dataloaders
    train_dataset = EyeMovementDataset(X_train, y_train, user_to_index)
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.05)

    # Training loop
    num_epochs = 200

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for batch_X, batch_y in tqdm(
            train_dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"
        ):
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(
            f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_dataloader):.4f}"
        )

    # Save the model
    torch.save(model.state_dict(), model_path)

# Evaluation on test set
model.eval()
test_dataset = EyeMovementDataset(X_test, y_test, user_to_index)
test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

with torch.no_grad():
    correct = 0
    total = 0
    for batch_X, batch_y in test_dataloader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = model(batch_X)
        _, predicted = torch.max(outputs.data, 1)
        total += batch_y.size(0)
        correct += (predicted == batch_y).sum().item()
        # print predicted user vs true user
        print(predicted[1], batch_y[1])

    print(f"Overall Test Accuracy: {100 * correct / total:.2f}%")

chosen_task = "3_VID"
task_accuracies = evaluate_task_accuracy(
    model, X_test, y_test, task_names_test, user_to_index, index_to_user, chosen_task
)


def predict_user_data(model, test_dataloader, indexes, index_to_user, device):
    model.eval()
    all_predictions = []
    all_true_labels = []

    with torch.no_grad():
        for i, (batch_X, batch_y) in enumerate(test_dataloader):
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            outputs = model(batch_X)
            _, predicted = torch.max(outputs.data, 1)

            for j, idx in enumerate(
                range(
                    i * test_dataloader.batch_size, (i + 1) * test_dataloader.batch_size
                )
            ):
                if idx in indexes:
                    pred_user = index_to_user[predicted[j].item()]
                    true_user = index_to_user[batch_y[j].item()]
                    all_predictions.append(pred_user)
                    all_true_labels.append(true_user)
                    print(
                        f"Sample {idx}: Predicted User: {pred_user}, True User: {true_user}"
                    )

    return all_predictions, all_true_labels


# Usage example:
indexes_to_predict = [0, 10, 20, 30, 40, 41, 42, 136, 347]  # Example indexes
predictions, true_labels = predict_user_data(
    model, test_dataloader, indexes_to_predict, index_to_user, device
)
