In [1]:
import zmq

ctx = zmq.Context()
# The REQ talks to Pupil remote and receives the session unique IPC SUB PORT
socket = ctx.socket(zmq.REQ)

ip = 'localhost'
port = 50020

socket.connect(f'tcp://{ip}:{port}')

# Request 'SUB_PORT' for reading data
socket.send_string('SUB_PORT')
sub_port = socket.recv_string()

# Request 'PUB_PORT' for writing data
socket.send_string('PUB_PORT')
pub_port = socket.recv_string()

socket.close()

In [2]:
def create_socket(ctx_c, ip_c, topics):
    sub = ctx_c.socket(zmq.SUB)
    sub.connect(f'tcp://{ip_c}:{sub_port}')
    for topic in topics:
        sub.subscribe(topic)
    return sub

def update_screen(gaze_point, ost):
    gaze_point = np.array(gaze_point)
    gaze_point -= ost
    gaze_point = (
        screen_width - int(gaze_point[0] * screen_width),
        screen_height - int((1 - gaze_point[1]) * screen_height)
    )
    pygame.draw.circle(screen, RED, gaze_point, dot_radius // 2)
    pygame.display.flip()

In [3]:
# Colors
WHITE = (255, 255, 255)
RED = (255, 0, 0)
BLUE = (0, 0, 255)

dot_radius = 10
screen_size = (400, 400)

In [61]:
import numpy as np

class SaccadeGestureDetector:
    def __init__(self, origin_pos, start_timestamp, dist_max=0.3, dist_min=0.1, max_monitor_time=0.0005):
        """
        Initialize the detector with origin position, timestamp, and distance thresholds.

        :param origin_pos: The origin position (x, y) for the saccade gesture.
        :param start_timestamp: The timestamp when the gesture detection started.
        :param dist_max: Maximum distance from the origin for detecting a saccade.
        :param dist_min: Minimum distance to consider the gaze returned.
        """
        self.dist_max = dist_max
        self.dist_min = dist_min
        self.start_timestamp = start_timestamp
        self.origin_pos = np.array(origin_pos)

        self.gaze_point_buffer = []
        self.direction = None
        self.gaze_outside_origin = False
        self.finished = False
        self.max_monitor_time = max_monitor_time

    def check_origin_max_distance(self, gaze_point):
        """
        Check if the gaze point is within the maximum distance from the origin.
        """
        o_dist = np.linalg.norm(np.array(gaze_point) - self.origin_pos)
        return o_dist > self.dist_max

    def check_origin_min_distance(self, gaze_point):
        """
        Check if the gaze point is outside the minimum distance from the origin.
        """
        o_dist = np.linalg.norm(np.array(gaze_point) - self.origin_pos)
        return o_dist < self.dist_min

    def update(self, gaze_point, timestamp):
        """
        Update the detector with a new gaze point and timestamp.

        :param gaze_point: The current gaze position as [x, y].
        :param timestamp: The current timestamp.
        """
        if self.finished:
            return  # Gesture detection is complete, no further processing

        if self.check_origin_max_distance(gaze_point):
            self.gaze_point_buffer.append(gaze_point)

        if not self.gaze_outside_origin:
            # Check if gaze moves outside the maximum distance
            if self.check_origin_max_distance(gaze_point):
                self.gaze_outside_origin = True  # Gaze moved outside the origin
                self.monitor_start_time = timestamp
        else:
            if timestamp - self.monitor_start_time > self.max_monitor_time:
                self.finished = True
                self.direction = None  # Gesture timed out without returning
                return

            # Check if gaze returns within the minimum distance
            if self.check_origin_min_distance(gaze_point):
                mean_point = np.mean(self.gaze_point_buffer, axis=0)

                print(mean_point, self.origin_pos)

                delta = np.array(mean_point) - np.array(self.origin_pos)
                angle = np.degrees(np.arctan2(delta[1], delta[0])) % 360

                if 45 <= angle < 135:
                    self.direction = 'up'
                elif 135 <= angle < 225:
                    self.direction = 'left'
                elif 225 <= angle < 315:
                    self.direction = 'down'
                else:
                    self.direction = 'right'

                self.finished = True
                return

    def is_finished(self):
        """
        Check if the gesture detection is finished.
        """
        return self.finished

    def get_direction(self):
        """
        Get the direction of the detected gesture.
        """
        return self.direction

In [66]:
import msgpack

gaze_socket = create_socket(ctx, ip, ['fixation'])
topic, payload = gaze_socket.recv_multipart()
message = msgpack.loads(payload)
gaze_socket.close()
print("Fixation detected at:", message['norm_pos'])
origin = message['norm_pos']
start_timestamp = message['timestamp']

saccade_detector = SaccadeGestureDetector(origin, start_timestamp, dist_max=0.2, dist_min=0.05, max_monitor_time=0.5)

gaze_socket = create_socket(ctx, ip, ['gaze.'])

while not saccade_detector.is_finished():
    topic, payload = gaze_socket.recv_multipart()
    message = msgpack.loads(payload)
    gaze_pos = message['norm_pos']
    timestamp = message['timestamp']
    saccade_detector.update(gaze_pos, timestamp)

print("Saccade gesture detected:", saccade_detector.get_direction())


Fixation detected at: [0.5703942590567914, 0.5551827900786656]
[0.37688071 0.63361739] [0.57039426 0.55518279]
Saccade gesture detected: left
