Skip to content

Commit

Permalink
EyeTribe: Log time bug fixed; decreased sample lag
Browse files Browse the repository at this point in the history
Several improvements and additions:

1) The log time is no longer copied from the most recent sample, but
instead calculated at the time of the log. PC and EyeTribe clock
asynchrony is calculated at every new incoming sample, so it shouldn't
be too inaccurate.

2) The most recent sample (returned by the `sample` and `pupil_size`
methods) is no longer set in the sample processing Thread, but directly
as it comes in. This should fix (part of) the lag that some users were
experiencing.
  • Loading branch information
esdalmaijer committed Jun 22, 2016
1 parent 55bcec7 commit f1050a8
Showing 1 changed file with 203 additions and 18 deletions.
221 changes: 203 additions & 18 deletions pygaze/_eyetracker/pytribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,29 @@
# author: Edwin Dalmaijer
# email: edwin.dalmaijer@psy.ox.ac.uk
#
# version 3 (11-Aug-2014)
# version 4 (21-Jun-2016)

import os
import copy
import json
import time
import socket
import codecs
from threading import Thread, Lock
from multiprocessing import Queue
from threading import Lock, Thread
from multiprocessing import Event, Process, Queue
from pygaze.py3compat import *


# # # # #
# EYETRIBE CLASS

# The original EyeTribe class from earlier versions of PyTribe.
class EyeTribe:

"""class for eye tracking and data collection using an EyeTribe tracker
"""

def __init__(self, logfilename='default'):
def __init__(self, logfilename='default', host='localhost', port=6555):

"""Initializes an EyeTribe instance
Expand All @@ -31,15 +35,15 @@ def __init__(self, logfilename='default'):
a full path to it's location and an extension
(default = 'default.txt')
"""

# initialize data collectors
self._logfile = codecs.open('%s.tsv' % (logfilename), 'w', u'utf-8')
self._separator = u'\t'
self._log_header()
self._queue = Queue()

# initialize connection
self._connection = connection(host='localhost',port=6555)
self._connection = connection(host=host, port=port)
self._tracker = tracker(self._connection)
self._heartbeat = heartbeat(self._connection)

Expand All @@ -57,14 +61,16 @@ def __init__(self, logfilename='default'):
self._streaming = True
self._samplefreq = self._tracker.get_framerate()
self._intsampletime = 1.0 / self._samplefreq
self._clockdiff = None
self._newestframe = self._tracker.get_frame()
self._ssthread = Thread(target=self._stream_samples, args=[self._queue])
self._ssthread.daemon = True
self._ssthread.name = 'samplestreamer'

# initialize data processer
self._processing = True
self._logdata = False
self._currentsample = self._tracker.get_frame()
self._currentsample = copy.deepcopy(self._newestframe)
self._dpthread = Thread(target=self._process_samples, args=[self._queue])
self._dpthread.daemon = True
self._dpthread.name = 'dataprocessor'
Expand Down Expand Up @@ -110,16 +116,20 @@ def log_message(self, message):
"""Logs a message to the logfile, time locked to the most recent
sample
"""

# timestamp, based on the most recent sample
if self._currentsample != None:
ts = self._currentsample['timestamp']
t = self._currentsample['time']

# Get the current time.
t = time.time()
# Make a string in the specific format that the EyeTribe uses:
# yyyy-mm-dd HH:MM:SS.000
ts = '%s.%d' % (time.strftime('%Y-%m-%d %H:%M:%S'), round(t % 1, 3)*1000)

# Correct the time to EyeTribe time
if self._clockdiff != None:
t = int(t + self._clockdiff)
else:
ts = ''
t = ''
# assemble line
line = self._separator.join(map(str,[u'MSG',ts,t, safe_decode(message)]))
line = self._separator.join(map(str,[u'MSG', ts, t, safe_decode(message)]))
# write message
self._logfile.write(line + u'\n') # to internal buffer

Expand All @@ -137,10 +147,10 @@ def sample(self):
gaze -- a (x,y) tuple indicating the point of regard
"""

if self._currentsample == None:
if self._newestframe == None:
return None, None
else:
return (self._currentsample['avgx'],self._currentsample['avgy'])
return (self._newestframe['avgx'],self._newestframe['avgy'])

def pupil_size(self):

Expand All @@ -159,7 +169,7 @@ def pupil_size(self):
if self._currentsample == None:
return None
else:
return self._currentsample['psize']
return self._newestframe['psize']

def close(self):

Expand Down Expand Up @@ -240,10 +250,15 @@ def _stream_samples(self, queue):
self._lock.acquire(True)
# get a new sample
sample = self._tracker.get_frame()
t1 = time.time()
# put the sample in the Queue
queue.put(sample)
# release the Threading Lock
self._lock.release()
# Update the newest frame
self._newestframe = copy.deepcopy(sample)
# Calculate the clock difference
self._clockdiff = sample['time'] - t1
# pause for half the intersample time, to avoid an overflow
# (but to make sure to not miss any samples)
time.sleep(self._intsampletime/2)
Expand Down Expand Up @@ -335,8 +350,178 @@ def _log_header(self):
self._firstlog = False



# # # # # #
# PARALLEL ClASS


# Ugly, but sod it: A global variable for the most recent sample.
global _current_sample


# Class to communicate with an EyeTribe tracker. The actual communications
# and logging actually run in a separate Process. This class just sends
# commands to that Process.
class ParallelEyeTribe:

def __init__(self, logfilename='default'):

# Set some standard stuff (hard coded now, but can potentially
# be passed to the __init__ method in the future.)
host = 'localhost'
port = 6555

# We need an Event that signals whether the connection to the
# EyeTribe is supposed to be open.
self._connection_alive = Event()
self._connection_alive.set()

# We also need a Queue to send commands through.
self._command_queue = Queue()
# And we need a Queue to receive commands through.
self._to_main_queue = Queue()

# Start a parallel process that will take care of all EyeTribe
# things. It will provide regular heartbeats to keep the connection
# alive, it will record gaze data to a file, and it will keep the
# most recent sample updated. This is all done in a separate
# Process (rather than in Threads) to so that it can be offloaded
# to a different CPU core. This prevents the ongoing experiment
# (or whatever you're doing in the main Thread) from interfering
# with the processing (and recording) of gaze data.
self.eyetribe_process = Process(target=_run_eyetribe_process, \
args=[logfilename, host, port, self._connection_alive, \
self._command_queue])
self.eyetribe_process.name = u'pygaze_eyetribe'
self.eyetribe_process.daemon = True
self.eyetribe_process.start()

def start_recording(self):

"""Starts data recording
"""

# Send a command to the EyeTribe Process
self._command_queue.put(('start_recording', ()))

def stop_recording(self):

"""Stops data recording
"""

# Send a command to the EyeTribe Process
self._command_queue.put(('stop_recording', ()))

def log_message(self, message):

"""Logs a message to the logfile, time locked to the most recent
sample
"""

# Send a command to the EyeTribe Process
self._command_queue.put(('log_message', (message)))

def sample(self):

"""Returns the most recent point of regard (=gaze location on screen)
coordinates (smoothed signal)
arguments
None
returns
gaze -- a (x,y) tuple indicating the point of regard
"""

global _current_sample

if _current_sample == None:
return None, None
else:
return (_current_sample['avgx'], _current_sample['avgy'])

def pupil_size(self):

"""Returns the most recent pupil size sample (an average of the size
of both pupils)
arguments
None
returns
pupsize -- a float indicating the pupil size (in arbitrary units)
"""

global _current_sample

if _current_sample == None:
return None
else:
return _current_sample['psize']

def close(self):

"""Stops all data streaming, and closes both the connection to the
tracker and the logfile
"""

# Send a command to the EyeTribe Process
self._command_queue.put(('close', ()))


# Function that can run in a parallel process, to keep a connection with the
# EyeTribe open, and to log data when appropriate.
def _run_eyetribe_process(logfilename, host, port, connection_alive, command_queue):

"""FOR INTERNAL USE ONLY
"""

# Ugly, but sod it: A global variable for the most recent sample.
global _current_sample

# Initialise a new _EyeTribe instance to open the connection to the
# EyeTribe.
tracker = EyeTribe(logfilename=logfilename, host=host, port=port)

# Run until the connection is closed.
while connection_alive.is_set():

# Check the incoming Queue.
if not command_queue.empty():
# Get the oldest command in the Queue. This is a tuple that
# contains a string (the command), and a tuple of values
# (what they are depends on the specific command).
cmd, value = command_queue.get()

# Start recording.
if cmd == 'start_recording':
tracker.start_recording()

# Stop recording.
elif cmd == 'stop_recording':
tracker.stop_recording()

# Log a message.
elif cmd == 'log_message':
tracker.log_message(value[0])

# Close the connection to the EyeTribe.
elif cmd == 'close':
tracker.close()
# Unset the Event that signals that the connection is
# alive.
connection_alive.clear()

# Update the current sample.
_current_sample = copy.deepcopy(tracker._currentsample)


# # # # #
# low-level classes
# SUPPORTING CLASSES

class connection:

Expand Down

0 comments on commit f1050a8

Please sign in to comment.