# Tutorial 4: On-Device Eye-tracking and Hand-tracking data streams

## Introduction

In Aria-Gen2 glasses, one of the key upgrade from Aria-Gen1 is the capability to run Machine Perception (MP) algorithms on the device during streaming / recording. Currently supported on-device MP algorithms include Eye-tracking, Hand-tracking, and VIO. These algorithm results are stored as separate data streams in the VRS file. 

This tutorial focuses on demonstration of how to use the **Eye-tracking and Hand-tracking** results. 

**What you'll learn:**

- How to access on-device EyeGaze and HandTracking data from VRS files
- Understanding the concept of interpolated hand tracking and why interpolation
  is needed
- How to visualize EyeGaze and HandTracking data projected onto 2D camera images
  using DeviceCalibration
- How to match MP data with camera frames using timestamps

**Prerequisites**
- Complete Tutorial 1 (VrsDataProvider Basics) to understand basic data provider concepts
- Complete Tutorial 2 (Device Calibration) to understand how to properly use calibration in Aria data. 

In [None]:
from projectaria_tools.core import data_provider

# Load local VRS file
vrs_file_path = "path/to/your/recording.vrs"
vrs_data_provider = data_provider.create_vrs_data_provider(vrs_file_path)


# Query EyeGaze data streams
eyegaze_label = "eyegaze"
eyegaze_stream_id = vrs_data_provider.get_stream_id_from_label(eyegaze_label)
if eyegaze_stream_id is None:
    raise RuntimeError(
        f"{eyegaze_label} data stream does not exist! Please use a VRS that contains valid eyegaze data for this tutorial."
    )

# Query HandTracking data streams
handtracking_label = "handtracking"
handtracking_stream_id = vrs_data_provider.get_stream_id_from_label(handtracking_label)
if handtracking_stream_id is None:
    raise RuntimeError(
        f"{handtracking_label} data stream does not exist! Please use a VRS that contains valid handtracking data for this tutorial."
    )

## On-Device Eye-tracking results
### EyeGaze Data Structure

The EyeGaze data type represents on-device eye tracking results. 
**Importantly, it directly reuses [the EyeGaze data structure](https://github.com/facebookresearch/projectaria_tools/blob/main/core/mps/EyeGaze.h)
 from MPS (Machine Perception Services)**, providing guaranteed compatibility across VRS and MPS.

**Key `EyeGaze` fields**

| Field Name                    |  Description                                                             |
| :---------------------------- | :---------------------------------------------------------------------- |
| `session_uid`                 |  Unique ID for the eyetracking session                                   |
| `tracking_timestamp`          |  Timestamp of the eye tracking camera frame in device time domain, in us.       |
| `vergence.t[x,y,z]_[left,right]_eye`          |  Translation for each eye origin in CPF frame       |
| `yaw`,`vergence.[left,right]_yaw`        | Eye gaze yaw angle (horizontal) in radians in CPF frame                 |
| `pitch`,`vergence.[left,right]_pitch`(Gen2-only)                       | Eye gaze pitch angle (vertical) in radians in CPF frame. The left and right pitch are assumed to be the same in Aria-Gen1.    |
| `depth`                       | Depth in meters of the 3D eye gaze point in CPF frame (0 = unavailable) |
| `yaw_low`,`yaw_high`,`pitch_low`,`pitch_high`  | Confidence interval bounds for yaw and pitch angle                                |
| **Aria-Gen2 specific fields**                                            | 
| `combined_gaze_origin_in_cpf` | Combined gaze origin in CPF frame (Gen2 only)                           |
| `spatial_gaze_point_in_cpf`   | 3D spatial gaze point in CPF frame                                      |
| `vergence.[left,right]_entrance_pupil_position_meter`   |  Entrance pupil positions for each eye                                      |
| `vergence.[left,right]_pupil_diameter_meter`   |  Entrance pupil diameter for each eye                                      |
| `vergence.[left,right]_blink`   | Blink detection for left and right eyes                                      |
| `*_valid`    | Boolean flags to indicating if the corresponding data field in EyeGaze is valid                          |




### EyeGaze API Reference
In `vrs_data_provider`, EyeGaze is treated the same way as any other sensor data, and share similar query APIs covered in `Tutorial_1_vrs_data_provider_basics`: 
- `vrs_data_provider.get_eye_gaze_data_by_index(stream_id, index)`: Query by index. 
- `vrs_data_provider.get_eye_gaze_data_by_time_ns(stream_id, timestamp, time_domain, query_options)`: Query by timestamp. 

In [None]:
from projectaria_tools.core.mps import get_unit_vector_from_yaw_pitch
from datetime import timedelta

print("=== EyeGaze Data Sample ===")
num_eyegaze_samples = vrs_data_provider.get_num_data(eyegaze_stream_id)
selected_index = min(5, num_eyegaze_samples)
print(f"Sample {selected_index}:")

eyegaze_data = vrs_data_provider.get_eye_gaze_data_by_index(eyegaze_stream_id, selected_index)

# Eyegaze timestamp is in format of datetime.deltatime in microseconds, convert it to integer
eyegaze_timestamp_ns = (eyegaze_data.tracking_timestamp // timedelta(microseconds=1)) * 1000
print(f"  Tracking timestamp: {eyegaze_timestamp_ns}")

# check if combined gaze is valid, if so, print out the gaze direction
print(f"  Combined gaze valid: {eyegaze_data.combined_gaze_valid}")
if eyegaze_data.combined_gaze_valid:
    print(f"  Yaw: {eyegaze_data.yaw:.3f} rad")
    print(f"  Pitch: {eyegaze_data.pitch:.3f} rad")
    print(f"  Depth: {eyegaze_data.depth:.3f} m")
    # Can also print gaze direction in unit vector
    gaze_direction_in_unit_vec = get_unit_vector_from_yaw_pitch(eyegaze_data.yaw, eyegaze_data.pitch)
    print(f"  Gaze direction in unit vec [xyz]: {gaze_direction_in_unit_vec}")

# Check if spatial gaze point is valid, if so, print out the spatial gaze point
print(
    f"  Spatial gaze point valid: {eyegaze_data.spatial_gaze_point_valid}"
)
if eyegaze_data.spatial_gaze_point_valid:
    print(
        f"  Spatial gaze point in CPF: {eyegaze_data.spatial_gaze_point_in_cpf}"
    )

### EyeGaze visualization in camera images
To visualize EyeGaze in camera images, you just need to project eye tracking results into the camera images using the camera's calibration. But please note the coordinate frame difference, entailed below. 

**EyeGaze Coordinate System - Central Pupil Frame (CPF)** 

All Eyetracking results in Aria are stored in a reference coordinates system called **Central Pupil Frame (`CPF`)**, which is approximately the center of user's two eye positions. Note that this **`CPF` frame is DIFFERENT from the `Device` frame in device calibration**, where the latter is essentially the `slam-front-left` (for Gen2) or `camera-slam-left` (for Gen1) camera. To transform between `CPF` and `Device`, we provide the following API to query their relative pose, and see the following code cell for usage: 
```
device_calibration.get_transform_device_cpf()
``` 

In [None]:
import rerun as rr
from projectaria_tools.core.sensor_data import SensorDataType, TimeDomain, TimeQueryOptions
from projectaria_tools.utils.rerun_helpers import create_hand_skeleton_from_landmarks

def plot_eyegaze_in_camera(eyegaze_data, camera_label, camera_calib, T_device_cpf):
    """
    A helper function to plot eyegaze's spatial gaze point into a camera image
    """
    # Skip if eyegaze data is invalid
    if not (
        eyegaze_data.spatial_gaze_point_valid and eyegaze_data.combined_gaze_valid
    ):
        return

    # First, transform spatial gaze point from CPF -> Device -> Camera frame
    spatial_gaze_point_in_cpf = eyegaze_data.spatial_gaze_point_in_cpf
    spatial_gaze_point_in_device = T_device_cpf @ spatial_gaze_point_in_cpf
    spatial_gaze_point_in_camera = (
        camera_calib.get_transform_device_camera().inverse()
        @ spatial_gaze_point_in_device
    )

    # Project into camera and plot 2D gaze location
    maybe_pixel = camera_calib.project(spatial_gaze_point_in_camera)
    if maybe_pixel is not None:
        rr.log(
            f"{camera_label}/gaze_point",
            rr.Points2D(
                positions=[maybe_pixel],
                colors=[255, 64, 255],
                radii = [30.0]
            ),
        )

print("\n=== Visualizing on-device eye tracking in camera images ===")

# First, query the RGB camera stream ids
device_calib = vrs_data_provider.get_device_calibration()
T_device_cpf = device_calib.get_transform_device_cpf()

rgb_camera_label = "camera-rgb"
rgb_stream_id = vrs_data_provider.get_stream_id_from_label(rgb_camera_label)
rgb_camera_calib = device_calib.get_camera_calib(rgb_camera_label)

rr.init("rerun_viz_et_in_cameras")

# Set up a sensor queue with only RGB image + EyeGaze
deliver_options = vrs_data_provider.get_default_deliver_queued_options()
deliver_options.deactivate_stream_all()
deliver_options.activate_stream(rgb_stream_id)
deliver_options.activate_stream(eyegaze_stream_id)

# Play for only 3 seconds
total_length_ns = vrs_data_provider.get_last_time_ns_all_streams(TimeDomain.DEVICE_TIME) - vrs_data_provider.get_first_time_ns_all_streams(TimeDomain.DEVICE_TIME)
skip_begin_ns = int(15 * 1e9) # Skip 15 seconds
duration_ns = int(3 * 1e9) # 3 seconds
skip_end_ns = max(total_length_ns - skip_begin_ns - duration_ns, 0)
deliver_options.set_truncate_first_device_time_ns(skip_begin_ns)
deliver_options.set_truncate_last_device_time_ns(skip_end_ns)

# Plot image data, and plot EyeGaze on top of RGB image data
for sensor_data in vrs_data_provider.deliver_queued_sensor_data(deliver_options):
    stream_id = sensor_data.stream_id()
    data_type = sensor_data.sensor_data_type()

    # ---------------
    # Image data: plot RGB images. 
    # ---------------
    if data_type == SensorDataType.IMAGE:
        # Convert back to image data, and plot in ReRun
        device_time_ns = sensor_data.get_time_ns(TimeDomain.DEVICE_TIME)
        image_data_and_record = sensor_data.image_data_and_record()

        # Visualize the images
        rr.set_time("device_time", duration = device_time_ns * 1e-9)
        rr.log(rgb_camera_label, rr.Image(image_data_and_record[0].to_numpy_array()))

    # ---------------
    #  Eye gaze data: plot EyeGaze's projection into camera images
    # ---------------
    elif data_type == SensorDataType.EYE_GAZE:
        device_time_ns = sensor_data.get_time_ns(TimeDomain.DEVICE_TIME)
        eye_gaze = sensor_data.eye_gaze_data()

        # Plot Eyegaze overlay on top of camera images
        rr.set_time("device_time", duration = device_time_ns * 1e-9)
        plot_eyegaze_in_camera(eyegaze_data = eye_gaze, camera_label = rgb_camera_label, camera_calib = rgb_camera_calib, T_device_cpf = T_device_cpf)


rr.notebook_show()


## On-Device Hand-tracking results
### Handtracking Data Structure
HandTracking data contains comprehensive 3D hand pose information. 
**Importantly, it directly reuses the [HandTrackingResults data structure](https://github.com/facebookresearch/projectaria_tools/blob/main/core/mps/HandTracking.h) from MPS (Machine Perception
Services)**, providing guaranteed compatibility across VRS and MPS.

**Key `EyeGaze` fields**

**Key Fields in `HandTrackingResults`**
| Field Name           | Description                                                             |
| -------------------- | ----------------------------------------------------------------------- |
| `tracking_timestamp` | Timestamp of the hand-tracking estimate in the device time domain.      |
| `left_hand`          | Left-hand pose, or `None` if no valid pose is found for the timestamp.  |
| `right_hand`         | Right-hand pose, or `None` if no valid pose is found for the timestamp. |

**Single Hand fields (left or right):**
| Field Name                     | Description                                                                                                                                                                                                                          |
| ------------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `confidence`                   | Tracking confidence score for this hand.                                                                                                                                                                                             |
| `landmark_positions_device`    | List of 21 hand-landmark positions in the device frame (3D points). <br>See the [wiki page](https://facebookresearch.github.io/projectaria_tools/docs/data_formats/mps/hand_tracking#hand_tracking_resultscsv) for landmark definitions. |
| `transform_device_wrist`       | Full SE3 transform of the wrist in the `Device` frame.                                                                                                                                                                               |
| `wrist_and_palm_normal_device` | Normal vectors for the wrist and palm joints in the `Device` frame.                                     

### Handtracking Coordinate System
All Handtracking results in Aria are stored in the `Device` coordinate frame, which is the same as device calibration. See `Tutorial_2_device_calibration` for definition of `Device` frame. 

In [None]:
import numpy as np

def print_single_hand_information(single_hand):
    """
    A helper function to print the hand tracking result of one hand
    """
    print(f"  Confidence: {single_hand.confidence:.3f}")
    print(
        f"    Landmarks shape: {np.array(single_hand.landmark_positions_device).shape}"
    )
    print(
        f"    Wrist location: {single_hand.get_wrist_position_device()}"
    )
    print(
        f"    Palm location: {single_hand.get_wrist_position_device()}"
    )

print("=== HandTracking Data Sample ===")
num_handtracking_samples = vrs_data_provider.get_num_data(handtracking_stream_id)
selected_index = min(5, num_handtracking_samples)
hand_data = vrs_data_provider.get_hand_pose_data_by_index(
    handtracking_stream_id, selected_index
)

print(f"Sample {selected_index}:")
print(f"  Tracking timestamp: {hand_data.tracking_timestamp}")

# Print the content of left and right hand if valid
if hand_data.left_hand is not None:
    print(" Left hand detected")
    print_single_hand_information(hand_data.left_hand)
else:
    print("  Left hand: Not detected")

if hand_data.right_hand is not None:
    print(" Right hand detected")
    print_single_hand_information(hand_data.right_hand)
else:
    print("  Right hand: Not detected")

### Interpolated Hand-tracking Results
**Context:**

In Aria-Gen2 glasses, **the on-device hand-tracking data are calculated from the SLAM cameras, not RGB cameras**. 
In the mean time, the SLAM cameras and RGB camera often runs at different sampling frequency, and their triggering are not aligned either. 
This causes that the handtracking result's timestamp often do NOT line up with that of RGB camera, causing additional challenges in accurately visualize handtracking results in RGB images. 

**API to query interpolated handtracking results**

To resolve this, `vrs_data_provider` enables a special query API for handtracking results: 
```
vrs_data_provider.get_interpolated_hand_pose_data(stream_id, timestamp_ns)
```
which will return an interpolated handtracking results, given any timestamp within valid timestamps of the VRS file. 

**Handtracking Interpolation Implementation**

1. Find the 2 nearest hand-tracking results before and after the target timestamp.  
2. If the 2 hand-tracking results time delta is larger than 100 ms, interpolation is considered unreliable → return `None`.  
3. Otherwise, interpolate each hand separately:  
   a. For the left or right hand, perform interpolation **only if both the "before" and "after" samples contain a valid result for that hand**.  
   b. If either sample is missing, the interpolated result for that hand will be `None`.  Example:  
      ```text
      interpolate(
          before = [left = valid, right = None],
          after  = [left = valid, right = valid]
      )
      → result = [left = interpolated, right = None]
      ```
4. Single-hand interpolation is calculated as:  
   a. Apply linear interpolation on the 3D hand landmark positions.  
   b. Apply SE3 interpolation on `T_Device_Wrist` 3D pose.  
   c. Re-calculate the wrist and palm normal vectors.  
   d. Take the `min` of confidence values.  


In [None]:
from projectaria_tools.core.sensor_data import SensorDataType, TimeDomain, TimeQueryOptions
from datetime import timedelta

print("\n=== Demonstrating query interpolated hand tracking results ===")

# Demonstrate how to query interpolated handtracking results
slam_stream_id = vrs_data_provider.get_stream_id_from_label("slam-front-left")
rgb_stream_id = vrs_data_provider.get_stream_id_from_label("camera-rgb")

# Retrieve a SLAM frame, use its timestamp as query
slam_sample_index = min(10, vrs_data_provider.get_num_data(slam_stream_id) - 1)
slam_data_and_record = vrs_data_provider.get_image_data_by_index(slam_stream_id, slam_sample_index)
slam_timestamp_ns = slam_data_and_record[1].capture_timestamp_ns

# Retrieve the closest RGB frame
rgb_data_and_record = vrs_data_provider.get_image_data_by_time_ns(
    rgb_stream_id, slam_timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
rgb_timestamp_ns = rgb_data_and_record[1].capture_timestamp_ns

# Retrieve the closest hand tracking data sample
raw_ht_data = vrs_data_provider.get_hand_pose_data_by_time_ns(
    handtracking_stream_id, slam_timestamp_ns, TimeDomain.DEVICE_TIME, TimeQueryOptions.CLOSEST
)
raw_ht_timestamp_ns = (raw_ht_data.tracking_timestamp // timedelta(microseconds=1)) * 1000

# Check if hand tracking aligns with RGB or SLAM data
print(f"SLAM timestamp: {slam_timestamp_ns}")
print(f"RGB timestamp:  {rgb_timestamp_ns}")
print(f"hand tracking timestamp:   {raw_ht_timestamp_ns}")
print(f"hand tracking-SLAM time diff: {abs(raw_ht_timestamp_ns - slam_timestamp_ns) / 1e6:.2f} ms")
print(f"hand tracking- RGB time diff: {abs(raw_ht_timestamp_ns - rgb_timestamp_ns) / 1e6:.2f} ms")

# Now, query interpolated hand tracking data sample using RGB timestamp.
interpolated_ht_data = vrs_data_provider.get_interpolated_hand_pose_data(
    handtracking_stream_id, rgb_timestamp_ns
)

# Check that interpolated hand tracking now aligns with RGB data
if interpolated_ht_data is not None:
    interpolated_ht_timestamp_ns = (interpolated_ht_data.tracking_timestamp// timedelta(microseconds=1)) * 1000
    print(f"Interpolated hand tracking timestamp: {interpolated_ht_timestamp_ns}")
    print(f"Interpolated hand tracking-RGB time diff: {abs(interpolated_ht_timestamp_ns - rgb_timestamp_ns) / 1e6:.2f} ms")
else:
    print("Interpolated hand tracking data is None - interpolation failed")

### Visualize Hand-tracking Results in Cameras
In this section, we show some example code on how to visualize the hand-tracking results in SLAM and RGB camera images. 
Basically, you need to project the hand tracking results (landmarks, skeleton lines) into the camera images using the camera's calibration. 

In [None]:
import rerun as rr
from projectaria_tools.core.sensor_data import SensorDataType, TimeDomain, TimeQueryOptions
from projectaria_tools.utils.rerun_helpers import create_hand_skeleton_from_landmarks

def plot_single_hand_in_camera(hand_joints_in_device, camera_label, camera_calib, hand_label):
    """
    A helper function to plot a single hand data in 2D camera view
    """
    # Setting different marker plot sizes for RGB and SLAM since they have different resolutions
    plot_ratio = 3.0 if camera_label == "camera-rgb" else 1.0
    marker_color = [255,64,0] if hand_label == "left" else [255, 255, 0]

    # project into camera frame, and also create line segments
    hand_joints_in_camera = []
    for pt_in_device in hand_joints_in_device:
        pt_in_camera = (
            camera_calib.get_transform_device_camera().inverse() @ pt_in_device
        )
        pixel = camera_calib.project(pt_in_camera)
        hand_joints_in_camera.append(pixel)

    # Create hand skeleton in 2D image space
    hand_skeleton = create_hand_skeleton_from_landmarks(hand_joints_in_camera)

    # Remove "None" markers from hand joints in camera. This is intentionally done AFTER the hand skeleton creation
    hand_joints_in_camera = list(
        filter(lambda x: x is not None, hand_joints_in_camera)
    )

    rr.log(
        f"{camera_label}/{hand_label}/landmarks",
        rr.Points2D(
            positions=hand_joints_in_camera,
            colors= marker_color,
            radii= [3.0 * plot_ratio]
        ),
    )
    rr.log(
        f"{camera_label}/{hand_label}/skeleton",
        rr.LineStrips2D(
            hand_skeleton,
            colors=[0, 255, 0],
            radii= [0.5 * plot_ratio],
        ),
    )

def plot_handpose_in_camera(hand_pose, camera_label, camera_calib):
    """
    A helper function to plot hand tracking results into a camera image
    """
    # Clear the canvas first
    #rr.log(
    #    f"{camera_label}/handtracking",
    #    rr.Clear.recursive(),
    #)

    # Plot both hands
    if hand_pose.left_hand is not None:
        plot_single_hand_in_camera(
            hand_joints_in_device=hand_pose.left_hand.landmark_positions_device,
            camera_label=camera_label,
            camera_calib = camera_calib,
            hand_label="left")
    if hand_pose.right_hand is not None:
        plot_single_hand_in_camera(
            hand_joints_in_device=hand_pose.right_hand.landmark_positions_device,
            camera_label=camera_label,
            camera_calib = camera_calib,
            hand_label="right")


In [None]:
print("\n=== Visualizing on-device hand tracking in camera images ===")

# First, query the RGB camera stream id
device_calib = vrs_data_provider.get_device_calibration()
rgb_camera_label = "camera-rgb"
slam_camera_labels = ["slam-front-left", "slam-front-right", "slam-side-left", "slam-side-right"]
rgb_stream_id = vrs_data_provider.get_stream_id_from_label(rgb_camera_label)
slam_stream_ids = [vrs_data_provider.get_stream_id_from_label(label) for label in slam_camera_labels]

rr.init("rerun_viz_ht_in_cameras")

# Set up a sensor queue with only RGB images.
# Handtracking data will be queried with interpolated API.
deliver_options = vrs_data_provider.get_default_deliver_queued_options()
deliver_options.deactivate_stream_all()
for stream_id in slam_stream_ids + [rgb_stream_id]:
    deliver_options.activate_stream(stream_id)

# Play for only 3 seconds
total_length_ns = vrs_data_provider.get_last_time_ns_all_streams(TimeDomain.DEVICE_TIME) - vrs_data_provider.get_first_time_ns_all_streams(TimeDomain.DEVICE_TIME)
skip_begin_ns = int(15 * 1e9) # Skip 15 seconds
duration_ns = int(3 * 1e9) # 3 seconds
skip_end_ns = max(total_length_ns - skip_begin_ns - duration_ns, 0)
deliver_options.set_truncate_first_device_time_ns(skip_begin_ns)
deliver_options.set_truncate_last_device_time_ns(skip_end_ns)

# Plot image data, and overlay hand tracking data
for sensor_data in vrs_data_provider.deliver_queued_sensor_data(deliver_options):
    # ---------------
    # Only image data will be obtained.
    # ---------------
    device_time_ns = sensor_data.get_time_ns(TimeDomain.DEVICE_TIME)
    image_data_and_record = sensor_data.image_data_and_record()
    stream_id = sensor_data.stream_id()
    camera_label = vrs_data_provider.get_label_from_stream_id(stream_id)
    camera_calib = device_calib.get_camera_calib(camera_label)
    

    # Visualize the RGB images.
    rr.set_time("device_time", duration = device_time_ns * 1e-9)
    rr.log(f"{camera_label}", rr.Image(image_data_and_record[0].to_numpy_array()))
    
    # Query and plot interpolated hand tracking result
    interpolated_hand_pose = vrs_data_provider.get_interpolated_hand_pose_data(handtracking_stream_id, device_time_ns, TimeDomain.DEVICE_TIME)
    if interpolated_hand_pose is not None:
        plot_handpose_in_camera(hand_pose = interpolated_hand_pose, camera_label = camera_label, camera_calib = camera_calib)

# Wait for rerun to buffer 1 second of data
import time
time.sleep(1)

rr.notebook_show()