# Gaze Visualiser
This notebook takes in one files taken from the Pupil Recordings `annotations.csv` and a `offset.json`, and the data compiled from the analyser from the analyser. These are:

In [49]:
# Imports
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import random
import json
import warnings
import os

matplotlib.use('TkAgg')
warnings.filterwarnings("ignore", category=RuntimeWarning)

# File Paths
annotations_filepath = 'source/annotations.csv'
gaze_filepath = ''
offset_filepath = 'offset.json'

# Reading the offset for correct time format and finding the filename of the dat

In [50]:
def obtain_offset(filepath):
    with open(offset_filepath, 'r') as file:
        data = json.load(file)  
    return data.get('offset')

def find_gaze_csv(directory):
    for filename in os.listdir(directory):
        if filename.endswith(".csv"):
            return filename
    return None


# Creating a visual representation

Displaying the difference between both non-smoothed and smoothed versions of `norm_pos_x` and `norm_pos_y:

In [51]:
def plot_norm_positions(df):
    fig, axs = plt.subplots(1, 2, figsize=(15, 6))
    axs[0].plot(df['time'], df['norm_pos_x'], label='Original norm_pos_x')
    axs[0].plot(df['time'], df['smoothed_norm_pos_x'], label=f'Smoothed norm_pos_x (window={10})', linestyle='--')
    axs[0].set_title('Norm_pos_x')

    axs[1].plot(df['time'], df['norm_pos_y'], label='Original norm_pos_y')
    axs[1].plot(df['time'], df['smoothed_norm_pos_y'], label=f'Smoothed norm_pos_y (window={10})', linestyle='--')
    axs[1].set_title('Norm_pos_y')

    for ax in axs:
        ax.set_xlabel('Time (s)')
        ax.set_ylabel('Position')
        ax.legend()
    fig.show()

## Plotting Angular Distance agaisnt Time

Using the `angular_distance` or `smoothed_angular_distance` plot an angular distance x time grapgh:

In [52]:
# Plot angular distance data from a DataFrame against time.
def plot_angular_distance(fig, ax, df, use_smooth=False, show_both=False):

    if show_both:
        ax.plot(df['time'], df['angular_distance'], label='Gaze Angular Distance',)
        ax.plot(df['time'], df['smoothed_angular_distance'], label=f'Angular Distance', linestyle='--')
    else:
        if use_smooth:
            ax.plot(df['time'], df['smoothed_angular_distance'], label=f'Angular Distance', color='orange')
        else:
            ax.plot(df['time'], df['angular_distance'], label='Gaze Angular Distance', color='orange')

    ax.set_xlabel('Time (s)')
    ax.set_ylabel('Angular Distance (rad)', color='black')
    ax.legend(loc='lower right')
    fig.suptitle('Interception Experiment')
    fig.tight_layout()

# Defining a graph to work with throughout the script:
# combined_figure, ax3 = plt.subplots(figsize=(16, 8))
# plot_angular_distance(combined_figure, ax3, gaze_df, show_both=True)
# combined_figure.show()


In [53]:
def plot_velocity_graph(df, use_smooth=False):
    if use_smooth:
        source_velocity = 'smoothed_angular_velocity'
    else:
        source_velocity = 'angular_velocity'
    
    velocities = df[source_velocity]
    plt.figure(figsize=(18, 6))
    plt.plot(df['time'], velocities, label=f'{velocities}',)

    plt.title('Angular Velocity x Time Graph')
    plt.xlabel('Time (s)')
    plt.ylabel('Angular Velocity (rad/s)')
    # plt.ylabel('Gaze Position')
# plt.show()

## Representing Eye Movements
Within the dataframe, there is a field called `movement_type` where we identify which one of the three eye movements (Saccade, Fixations, and Smooth Pursuit) base on velocities and provided csv file from Pupil Lab.

### Visualising Regions of Saccades

In [54]:
def plot_saccades(ax, df, use_smooth=False):
    if use_smooth:
        source_distance = 'smoothed_angular_distance'
        movement_type= 'smoothed_movement_type'
    else:
        source_distance = 'angular_distance'
        movement_type = 'movement_type'

    distances = df[source_distance]
    saccade_regions = df[movement_type] == 'Saccades'

    ax.plot(df['time'], np.where(saccade_regions, distances, np.nan), color='red', label='Saccade Line')
    ax.legend(loc='lower right')




### Identifying Regions of Smooth Pursuit

In [55]:
def plot_smooth_pursuits(ax, df, use_smooth=False):
    if use_smooth:
        source_distance = 'smoothed_angular_distance'
        movement_type_column = 'smoothed_movement_type'
    else:
        source_distance = 'angular_distance'
        movement_type_column = 'movement_type'

    distances = df[source_distance]
    smooth_pursuit_regions = df[movement_type_column] == 'Smooth Pursuit'

    # Plot for Smooth Pursuit Data
    ax.plot(df['time'], np.where(smooth_pursuit_regions, distances, np.nan), color='green', label='Smooth Pursuit')
    ax.legend(loc='upper left')

### Identifying Regions of Fixation

In [56]:
def plot_fixations(ax, df, use_smooth=False):
    if use_smooth:
        source_distance = 'smoothed_angular_distance'
        movement_type= 'smoothed_movement_type'
    else:
        source_distance = 'angular_distance'
        movement_type = 'movement_type'

    distances = df[source_distance]
    fixation_regions = df[movement_type] == 'Fixation'

    ax.plot(df['time'], np.where(fixation_regions, distances, np.nan), color='blue', label='Fixation')
    ax.legend(loc='lower right')

### Combining the identification functions
This would combine each of the function and allow the user control on what they wish to present on the graph

In [57]:
def plot_data(gaze_df, use_smooth, fixations=False, smooth_pursuits=False, saccades=False):
    figure, ax = plt.subplots(figsize=(16, 8))
    plot_angular_distance(figure, ax, gaze_df, use_smooth=use_smooth)

    if fixations:
        plot_fixations(ax, gaze_df, use_smooth=use_smooth)
    
    if smooth_pursuits:
        plot_smooth_pursuits(ax, gaze_df, use_smooth=use_smooth)

    if saccades:
        plot_saccades(ax, gaze_df, use_smooth=use_smooth)

    return figure, ax

## Adding Annotations to the graph
The annotation is compiled of different events that has occured throughout the event.

`add_annotations` looks at a gieven filepath to an `annotation_csv` file and uses `labels` and `timestamp` to determine where to plot events on a Angular Distance Time Graph

In [58]:
# Generate a random RGB color tuple.
def generate_random_color():
    return (random.random(), random.random(), random.random())

# Draw lines and labels representing spawned objects and interceptions on the given axis.
def draw_objects_and_interceptions(ax, spawn_timestamps, interception_timestamps, annotations_df, obstacle_ids):
    object_colors = {}

    # Handle Spawning annotations
    for timestamp, obj_id in zip(spawn_timestamps, annotations_df.loc[annotations_df['label'] == ('Spawning'), 'id']):

        # Checks if the current annotation is refering to an Obstacle objectType by filtering the annotations_df for the Object ID and retrieving the first 'objectType' (which there should only be one)
        obj_type = annotations_df.loc[(annotations_df['label'] == 'Spawning') & (annotations_df['id'] == obj_id), 'objectType'].values[0] if 'objectType' in annotations_df.columns else None
        if obj_type == 'Obstacle' and obj_id in obstacle_ids:
            draw_object_line(ax, timestamp, obj_id, object_colors, is_interception=False)

    # Handle Interception annotations
    for timestamp, obj_id in zip(interception_timestamps, annotations_df.loc[annotations_df['label'] == 'Intercepted', 'id']):
        obj_type = annotations_df.loc[(annotations_df['label'] =='Intercepted') & (annotations_df['id'] == obj_id),'objectType'].values[0] if 'objectType' in annotations_df.columns else None
        # There is a chance for an interception to be missed, so if there exists no records for the ID with label 'Intercepted' then there would be no object type to use, therefore don't need to plot
        
        if obj_type == 'Obstacle' and obj_id in obstacle_ids:
            draw_object_line(ax, timestamp, obj_id, object_colors, is_interception=True)

    return object_colors

# Draw vertical lines on the given axis to represent the start and end of an experiment and scale the x axis to those timestamps
def draw_experiment_lines(ax, start_timestamp, end_timestamp):
    ax.axvline(x=start_timestamp, color='black', linestyle='--')
    ax.axvline(x=end_timestamp, color='black', linestyle='--')
    ax.set_xlim(start_timestamp, end_timestamp)

# Draw a vertical line on the given axis to represent an object and add a text label.
def draw_object_line(ax, timestamp, obj_id, object_colors, is_interception):
    # Assign a random RGB color if ID doesn't have one
    object_colors.setdefault(obj_id, generate_random_color())

    # Get the line color
    line_color = object_colors[obj_id]

    # Determine the vertical alignment and position
    vertical_alignment = 'top' if is_interception else 'bottom'
    vertical_position = ax.get_ylim()[0] + 0.02 if vertical_alignment == 'bottom' else ax.get_ylim()[1] - 0.02

    # Plot a vertical line
    ax.axvline(x=timestamp, color=line_color, linestyle='--', alpha=0.7)

    # Add text label
    label_text = f'{"Intercepted" if is_interception else "Spawned"} {int(obj_id)} at {timestamp:.2f}'
    ax.text(timestamp, vertical_position, label_text, rotation=90, va=vertical_alignment, ha='right', color='black')


# Plot regions on the given axis, using colors based on the object_colors dictionary for each object ID.
def plot_observations(ax, annotations_df, object_colours, fill_threshold=1.0):
    looking_at_df = annotations_df.loc[annotations_df['label'] == 'Looking At'].copy()

    # Create an empty dictionary to store the regions for each object ID
    id_region_dict = {}

    # Initialise variables for tracking consecutive points
    current_obj_id = None
    start_time = None

    # Iterate through each row in the 'Looking At' dataframe
    for _, row in looking_at_df.iterrows():
        obj_id = row['id']
        timestamp = row['time']

        # Check if it's the same object and within the threshold seconds
        if obj_id == current_obj_id and start_time is not None and timestamp - start_time <= fill_threshold: 
            end_time = timestamp # Update the end time for the current region
        else:
            # A new region is created when the next ID does not match the current region's ID OR if the time between two points is greater than the threshold (indicating it's a new region for the same ID)
            if current_obj_id is not None and start_time is not None:
                if current_obj_id not in id_region_dict: # If an ID has not been given regions, this will make sure that it has been intialised before we can add new regions
                    id_region_dict[current_obj_id] = [] 
                id_region_dict[current_obj_id].append((start_time, end_time)) # Adds this region to the key with the Object ID

            # Update tracking variables for the next iteration
            current_obj_id = obj_id
            start_time = timestamp
            end_time = timestamp

    # Add the last region after the loop
    if current_obj_id is not None and start_time is not None:
        if current_obj_id not in id_region_dict:
            id_region_dict[current_obj_id] = []
        id_region_dict[current_obj_id].append((start_time, end_time))

    # Iterates through each key-value pair
    for obj_id, regions in id_region_dict.items():
        color = object_colours[obj_id] # Retrieves the colour of the current object from the dictionary

        for region in regions: # Each region should have a start time, and an end time. We iterate through each one whiles also colouring the region between them
            start_time, end_time = region
            ax.axvspan(start_time, end_time, color=color, alpha=0.2, label=f'Object ID {obj_id}')

# Add annotations to a given plot.
def add_annotations(ax, offset, filepath='annotations.csv', show_observable=False):
    if filepath:
        # Creates a DataFrame containing all the annotation data, and also creating a new field called 'Time' which format timestamps into the corresponding time of the recording
        annotations_df = pd.read_csv(filepath)
        annotations_df['time'] = annotations_df['timestamp'] + offset
        annotations_df['time'] -= annotations_df['time'].min()

        # Filter annotations for 'Spawning' or 'Intercepted' labels and ObjectType 'Obstacle'
        filtered_annotations = annotations_df[
            (annotations_df['label'].isin(['Spawning', 'Intercepted'])) &
            (annotations_df['objectType'] == 'Obstacle')
        ]

        # Identifying different types of annotation to plot on the graph
        spawn_timestamps = filtered_annotations.loc[filtered_annotations['label'] == 'Spawning', 'time'].values
        interception_timestamps = filtered_annotations.loc[filtered_annotations['label'] == 'Intercepted', 'time'].values

        start_timestamp = annotations_df.loc[(annotations_df['label'] == 'Experiment Started'), 'time'].values
        end_timestamp = annotations_df.loc[(annotations_df['label'] == 'Experiment Ended'), 'time'].values

        draw_experiment_lines(ax, start_timestamp, end_timestamp) # Draws a line for when the experiment has started and ended, as well as adjusting the x-axis scale to show only between those points

        # Extract ObjectType 'Obstacle' and ID for plotting
        obstacle_ids = filtered_annotations.loc[:, 'id'].values

        # Each object should be given a colour for their specific ID, this can be used for when we want to show what object the user is currently observing
        object_colors = draw_objects_and_interceptions(ax, spawn_timestamps, interception_timestamps, annotations_df, obstacle_ids)
        
        # This will show regions of which indicates where the user looks at depending on the annotations with the label 'Looking At'
        if show_observable:
            plot_observations(ax, annotations_df, object_colors)

## Displaying outcome of identifications and annotations:

In [60]:
# Retrieve the filepath of the csv file containing the data needed
gaze_filepath = find_gaze_csv(os.getcwd())

# Creates a DataFrame base on the csv in that filepath
gaze_df = pd.read_csv(gaze_filepath)

# Obtain the offset that helps format the annotation file
offset = obtain_offset(offset_filepath)

# Plot the eye data with required designs
figure, ax = plot_data(gaze_df, use_smooth=True, fixations=True, saccades=True, smooth_pursuits=True)

# Add the annotations to the graph
add_annotations(ax, offset, annotations_filepath, True)

# Display the graph
figure.show()

plot_norm_positions(gaze_df)
