Data Analysis with PyGazeAnalyzer 1.0

In [3]:
import numpy


def remove_missing(x, y, time, missing):
	mx = numpy.array(x==missing, dtype=int)
	my = numpy.array(y==missing, dtype=int)
	x = x[(mx+my) != 2]
	y = y[(mx+my) != 2]
	time = time[(mx+my) != 2]
	return x.reset_index(drop=True), y.reset_index(drop=True), time.reset_index(drop=True)


def fixation_detection_fixed(x, y, time, missing=0.0, maxdist=25, mindur=50):
	
	"""Detects fixations, defined as consecutive samples with an inter-sample
	distance of less than a set amount of pixels (disregarding missing data)
	
	arguments

	x		-	numpy array of x positions
	y		-	numpy array of y positions
	time		-	numpy array of EyeTribe timestamps

	keyword arguments

	missing	-	value to be used for missing data (default = 0.0)
	maxdist	-	maximal inter sample distance in pixels (default = 25)
	mindur	-	minimal duration of a fixation in milliseconds; detected
				fixation cadidates will be disregarded if they are below
				this duration (default = 100)
	
	returns
	Sfix, Efix
				Sfix	-	list of lists, each containing [starttime]
				Efix	-	list of lists, each containing [starttime, endtime, duration, endx, endy]
	"""
	#print("fixation_detection_fixed", x, y, time, missing, maxdist, mindur)
    
	x, y, time = remove_missing(x, y, time, missing)
    
	#print("remove_missing",x,y, time)

	# empty list to contain data
	Sfix = []
	Efix = []
	
	# loop through all coordinates
	si = 0
	fixstart = False
	for i in range(1,len(x)):
		# calculate Euclidean distance from the current fixation coordinate
		# to the next coordinate
		squared_distance = ((x[si]-x[i])**2 + (y[si]-y[i])**2)
		dist = 0.0

		if squared_distance > 0:
			dist = squared_distance**0.5
            
		#print("loop", i, squared_distance, dist)
        
		# check if the next coordinate is below maximal distance
		if dist <= maxdist and not fixstart:
			# start a new fixation
			si = 0 + i
			fixstart = True
			Sfix.append([time[i]])
		elif dist > maxdist and fixstart:
			# end the current fixation
			fixstart = False
			# only store the fixation if the duration is ok
			if time[i-1]-Sfix[-1][0] >= mindur:
				Efix.append([Sfix[-1][0], time[i-1], time[i-1]-Sfix[-1][0], x[si], y[si]])
			# delete the last fixation start if it was too short
			else:
				Sfix.pop(-1)
			si = 0 + i
		elif not fixstart:
			si += 1
	#add last fixation end (we can lose it if dist > maxdist is false for the last point)
	if len(Sfix) > len(Efix):
		Efix.append([Sfix[-1][0], time[len(x)-1], time[len(x)-1]-Sfix[-1][0], x[si], y[si]])
	return Sfix, Efix


Saccade Metrics

In [4]:

def saccade_detection_fixed(x, y, time, missing=0.0, minlen=5, maxvel=40, maxacc=340):
	
	"""Detects saccades, defined as consecutive samples with an inter-sample
	velocity of over a velocity threshold or an acceleration threshold
	
	arguments

	x		-	numpy array of x positions
	y		-	numpy array of y positions
	time		-	numpy array of tracker timestamps in milliseconds

	keyword arguments

	missing	-	value to be used for missing data (default = 0.0)
	minlen	-	minimal length of saccades in milliseconds; all detected
				saccades with len(sac) < minlen will be ignored
				(default = 5)
	maxvel	-	velocity threshold in pixels/second (default = 40)
	maxacc	-	acceleration threshold in pixels / second**2
				(default = 340)
	
	returns
	Ssac, Esac
			Ssac	-	list of lists, each containing [starttime]
			Esac	-	list of lists, each containing [starttime, endtime, duration, startx, starty, endx, endy]
	"""
	x, y, time = remove_missing(x, y, time, missing)

	# CONTAINERS
	Ssac = []
	Esac = []

	# INTER-SAMPLE MEASURES
	# the distance between samples is the square root of the sum
	# of the squared horizontal and vertical interdistances
	intdist = (numpy.diff(x)**2 + numpy.diff(y)**2)**0.5
	# get inter-sample times
	inttime = numpy.diff(time)

	# recalculate inter-sample times to seconds
	inttime = inttime / 1000.0
	
	# VELOCITY AND ACCELERATION
	# the velocity between samples is the inter-sample distance
	# divided by the inter-sample time
	vel = intdist / inttime
	# the acceleration is the sample-to-sample difference in
	# eye movement velocity
	acc = numpy.diff(vel)

	# SACCADE START AND END
	t0i = 0
	stop = False
	while not stop:
		# saccade start (t1) is when the velocity or acceleration
		# surpass threshold, saccade end (t2) is when both return
		# under threshold
	
		# detect saccade starts
		sacstarts = numpy.where((vel[1+t0i:] > maxvel).astype(int) + (acc[t0i:] > maxacc).astype(int) >= 1)[0]
		if len(sacstarts) > 0:
			# timestamp for starting position
			t1i = t0i + sacstarts[0] + 1
			if t1i >= len(time)-1:
				t1i = len(time)-2
			t1 = time[t1i]
			
			# add to saccade starts
			Ssac.append([t1])
			
			# detect saccade endings
			sacends = numpy.where((vel[1+t1i:] < maxvel).astype(int) + (acc[t1i:] < maxacc).astype(int) == 2)[0]
			if len(sacends) > 0:
				# timestamp for ending position
				t2i = sacends[0] + 1 + t1i + 2
				if t2i >= len(time):
					t2i = len(time)-1
				t2 = time[t2i]
				dur = t2 - t1

				# ignore saccades that did not last long enough
				if dur >= minlen:
					# add to saccade ends
					Esac.append([t1, t2, dur, x[t1i], y[t1i], x[t2i], y[t2i]])
				else:
					# remove last saccade start on too low duration
					Ssac.pop(-1)

				# update t0i
				t0i = 0 + t2i
			else:
				stop = True
		else:
			stop = True
	
	return Ssac, Esac


In [5]:
import os
import pandas as pd
import xml.etree.ElementTree as ET

def call_fixation_detection_on_data(fixation_info, participant, time, x, y, task = 0, missing=0.0, maxdist=25, mindur=50):
	missing = missing  # Specify the missing value threshold (if any)
	maxdist = maxdist  # Maximum distance for a fixation (adjust as needed)
	mindur = mindur  # Minimum duration for a fixation (adjust as needed)

	# Perform fixation detection using the fixed fixation_detection function
	Sfix, Efix = fixation_detection_fixed(x, y, time, missing=missing, maxdist=maxdist, mindur=mindur)

	# Calculate total fixation duration and average fixation duration
	total_duration = sum(sublist[2] for sublist in Efix)
	average_duration = total_duration / len(Efix) if len(Efix) > 0 else 0

	# Update the fixation_info dictionary
	fixation_info['Participant'].append(participant)
	fixation_info['Task'].append(task)
	fixation_info['Fixation Count'].append(len(Efix))
	fixation_info['Total Fixation Duration [ms]'].append(total_duration)
	fixation_info['Average Fixation Duration [ms]'].append(average_duration)

def call_saccade_detection_on_data(saccade_info, participant, time, x, y, task = 0):
    missing = 0.0  # Specify the missing value threshold (if any)
    minlen = 5  # Maximum distance for a saccade (adjust as needed)
    maxvel = 40  # Minimum duration for a saccade (adjust as needed)
    maxacc = 340

    # Perform saccade detection using the saccade_detection function
    Ssac, Esac = saccade_detection_fixed(x, y, time, missing=missing, minlen=minlen, maxvel=maxvel, maxacc=maxacc)

    # Calculate total saccade duration and average saccade duration
    total_duration = sum(sublist[2] for sublist in Esac)
    average_duration = total_duration / len(Esac) if len(Esac) > 0 else 0

	# Calculate average saccade distance
    average_distance = sum(((sublist[3] - sublist[5])**2 + (sublist[4] - sublist[6])**2)**0.5 for sublist in Esac) / len(Esac) if len(Esac) > 0 else 0

    # Update the saccade_info dictionary
    saccade_info['Participant'].append(participant)
    saccade_info['Task'].append(task)
    saccade_info['Saccade Count'].append(len(Esac))
    saccade_info['Total Saccade Duration [ms]'].append(total_duration)
    saccade_info['Average Saccade Duration [ms]'].append(average_duration)
    saccade_info['Average Saccade Distance [px]'].append(average_distance)


'''
def prepare_tobii_data(directory_path, file_name, fixation_info, fn):
	tsv_file = os.path.join(directory_path, file_name)

	# Load the Tobii eye tracker data into a Pandas DataFrame
	df = pd.read_csv(tsv_file, delimiter='\t', low_memory=False)
	df = df[['Gaze point X [DACS px]', 'Gaze point Y [DACS px]', 'Recording timestamp [ms]']]
	df = df.fillna(0.0)

	# Define parameters for fixation detection
	x = df['Gaze point X [DACS px]']  # X-coordinate data
	y = df['Gaze point Y [DACS px]']  # Y-coordinate data
	time = df['Recording timestamp [ms]']

	fn(fixation_info, os.path.splitext(file_name)[0], time, x, y)
      
'''

def prepare_tobii_data(directory_path, file_name, fixation_info, fn, parameters = {'missing': 0.0, 'maxdist': 25, 'mindur': 50}):
	tsv_file = os.path.join(directory_path, file_name)
	print("prepare_tobii_data", tsv_file)

	# Load the Tobii eye tracker data into a Pandas DataFrame and skip lines that start with ## as they are comments

	df = None
	possible_skipped_rows = [37, 32, 41, 45]
	counter = 0
	while df is None:
		try:
			df = pd.read_csv(tsv_file, delimiter='\t', low_memory=False, on_bad_lines='skip', skiprows=possible_skipped_rows[counter])
			df = df[['Time', 'Type', 'L Raw X [px]', 'L Raw Y [px]', 'R Raw X [px]', 'R Raw Y [px]', 'L Validity', 'R Validity', 'R POR X [px]', 'R POR Y [px]']]
		except KeyError as e:
			counter = counter + 1
			df = None
			if counter >= len(possible_skipped_rows):
				print("no possible skipped rows worked", e)
				break
		else:
			print("possible skipped rows worked", possible_skipped_rows[counter])
			break

	df['Type'] = df['Type'].astype(str)
	df = df.fillna(0.0)

	# Get the row numbers where Type is 'MSG'
	msg_rows = df[df['Type'] == 'MSG'].index

	# For each msg_row, split the df into two dataframes, one before the msg_row and one after
	# Run the analysis for each of those split dataframes as they represent different tasks
	df_after_msg = df
	last_task_name = None
	last_task_row_number = 0
	removed_task_names = {
		'instruction_calibration.jpg': True,
		'instruction_comprehension.jpg': True,
	}

	for msg_row in msg_rows:
		current_task_name = df['L Raw X [px]'][msg_row].split('Message: ')[1]
		if '.jpg' in current_task_name:			
			if last_task_name is not None and last_task_name not in removed_task_names:
				
				df_msg = df[last_task_row_number:msg_row]
				# Remove all rows at least one eye is invalid
				#df_msg = df_msg[(df_msg['L Validity'] == 1) & (df_msg['R Validity'] == 1)]
				df_msg = df_msg[(df_msg['R Validity'] == 1)]
				'''
				# Define parameters for fixation detection
				x_left = df_msg['L Raw X [px]'].astype(float)
				x_right = df_msg['R Raw X [px]'] 
				x = (x_left + x_right) / 2

				y_left = df_msg['L Raw Y [px]']
				y_right = df_msg['R Raw Y [px]']
				y = (y_left + y_right) / 2
				'''

				# Define parameters for fixation detection
				x_left = df_msg.loc[:,('L Raw X [px]')].astype(float)
				#x_right = df_msg.loc[:,('R Raw X [px]')]
				x_right = df_msg.loc[:,('R POR X [px]')]
				#df_msg.loc[:,('X')] = (x_left + x_right) / 2
				df_msg.loc[:,('X')] = x_right

				y_left = df_msg['L Raw Y [px]']
				#y_right = df_msg['R Raw Y [px]']
				y_right = df_msg.loc[:,('R POR Y [px]')]
				#df_msg.loc[:,('Y')] = (y_left + y_right) / 2
				df_msg.loc[:,('Y')] = y_right

				# Normalize time
				time = df_msg['Time'] - df_msg['Time'].min()
				# Time conversion from microseconds to milliseconds
				time = time / 1000

				participant_id = file_name.split('_')[0]
				#fn(fixation_info, participant_id, time, x, y, last_task_name)
				fn(fixation_info, participant_id, time, x_right, y_right, last_task_name, parameters['missing'], parameters['maxdist'], parameters['mindur'])

			last_task_name = current_task_name
			last_task_row_number = msg_row

		
	df_msg = df[last_task_row_number:]
	# Remove all rows at least one eye is invalid
	#df_msg = df_msg[(df_msg['L Validity'] == 1) & (df_msg['R Validity'] == 1)]
	df_msg = df_msg[(df_msg['R Validity'] == 1)]

	# Define parameters for fixation detection
	x_left = df_msg['L Raw X [px]'].astype(float)
	x_right = df_msg['R POR X [px]'] 
	x = (x_left + x_right) / 2

	y_left = df_msg['L Raw Y [px]']
	y_right = df_msg['R POR Y [px]']
	y = (y_left + y_right) / 2
	# Normalize time
	time = df_msg['Time'] - df_msg['Time'].min()
	# Time conversion from microseconds to milliseconds
	time = time / 1000

	participant_id = file_name.split('_')[0]
	fn(fixation_info, participant_id, time, x_right, y_right, last_task_name)


			

def prepare_csv_data(directory_path, file_name, fixation_info, fn, parameters = {'missing': 0.0, 'maxdist': 25, 'mindur': 50}):
	csv_file = os.path.join(directory_path, file_name)

	file_name_split = file_name.split('-')
	task_number = file_name_split[0][1:]
	participant_id = file_name_split[1]

	# Load the eye tracker data into a Pandas DataFrame
	df = pd.read_csv(csv_file, delimiter=',', low_memory=False, on_bad_lines='skip', dtype = {'x': float, 'y': str, 'time': float})
	time_column = df.filter(like='TIME(')
	df = df[['FPOGX', 'FPOGY']]

	time_column = time_column.rename(columns={time_column.columns[0]: 'TIME'})

	# Add the renamed column to the original DataFrame
	df = pd.concat([df, time_column], axis=1)

	df = df.fillna(0.0)
	df['FPOGX'] = df['FPOGX'] * 1920
	df['FPOGY'] = df['FPOGY'] * 1080


	# convert time to milliseconds
	df['TIME'] = df['TIME']*1000


	# Call the defined fn
	fn(fixation_info, participant_id, df['TIME'], df['FPOGX'], df['FPOGY'], task_number, parameters['missing'], parameters['maxdist'], parameters['mindur'])


def prepare_xml_data(directory_path, file_name, fixation_info, fn, parameters = {'missing': 0.0, 'maxdist': 25, 'mindur': 50}):
	# We only want the eclipse xml files as those have the x and y coordinates
	if not 'eclipse' in file_name:
		return

	xml_file = os.path.join(directory_path, file_name)

	path_elements = directory_path.split(os.sep)
	participant_id = path_elements[-3]
	task = path_elements[-2].split('-')[2]

	tree = ET.parse(xml_file)
	root = tree.getroot()

	# Extract data from XML
	data = []
	for response_elem in root.findall(".//response"):
		x = float(response_elem.get("x"))
		y = float(response_elem.get("y"))
		# Event time is in nanoseconds, so we divide by 1000000 to get milliseconds
		event_time = int(response_elem.get("event_time")) / 1000000
		
		data.append({"event_time": event_time, "x": x, "y": y})

	# Create DataFrame
	df = pd.DataFrame(data)

	fn(fixation_info, participant_id, df['event_time'], df['x'], df['y'], task, parameters['missing'], parameters['maxdist'], parameters['mindur'])


def prepare_txt_data(directory_path, file_name, fixation_info, fn, parameters = {'missing': 0.0, 'maxdist': 25, 'mindur': 50}):
	# We only want the ogama txt files as those have the x and y coordinates
	if not 'ogama' in file_name:
		return
	txt_file = os.path.join(directory_path, file_name)

	path_elements = directory_path.split(os.sep)
	participant_id = path_elements[-1]

	# Load the eye tracker data into a Pandas DataFrame
	df = pd.read_csv(txt_file, delimiter=',', low_memory=False, on_bad_lines='skip', encoding = "utf-16")
	df = df[[' ImageName', ' X', ' Y', ' StartTime', ' Included?', ' StimulusType']]

	df[' X'] = df[' X'] * 1920 / 1024
	df[' Y'] = df[' Y'] * 1080 / 768

	# Remove all rows where Included? == N
	df = df[df[' Included?'] == 'Y']

	df = df.fillna(0.0)

	
	# Iterate over all unique images
	for image_name in df[' ImageName'].unique():
		# Get the data for the current image
		image_df = df[df[' ImageName'] == image_name]

		# Call the defined fn
		fn(fixation_info, participant_id, image_df[' StartTime'], image_df[' X'], image_df[' Y'], image_name, parameters['missing'], parameters['maxdist'], parameters['mindur'])
	

ending_to_function = {
    '.tsv': prepare_tobii_data,
    '.csv': prepare_csv_data,
	'.xml': prepare_xml_data,
	'.txt': prepare_txt_data
}

test_fixation_info = {
        'Participant': [],
        'Task': [],
        'Fixation Count': [],
        'Total Fixation Duration [ms]': [],
        'Average Fixation Duration [ms]': []
    }



#prepare_txt_data('data\\4\\formatted-raw-data\\151\\', 'ogama.txt', test_fixation_info, call_fixation_detection_on_data)
#print(pd.DataFrame(test_fixation_info))

In [6]:
import os
import pandas as pd


def fixation_data_analysis(directory_path, output_csv = "pygaze_fixations.csv", parameters = {'missing': 0.0, 'maxdist': 25, 'mindur': 50}):

    # Initialize a dictionary to store the fixation counts, total fixation duration, and average fixation duration for each file
    fixation_info = {
        'Participant': [],
        'Task': [],
        'Fixation Count': [],
        'Total Fixation Duration [ms]': [],
        'Average Fixation Duration [ms]': []
    }


    # List all files in the directory and subfolders
    file_names = {}
    for root, dirs, files in os.walk(directory_path):
        dir_filenames = [file_name for file_name in files]
        file_names[root] = dir_filenames

    # Iterate over the files in the directory
    for directory, file_names in file_names.items():
        for file_name in file_names:
            ending = os.path.splitext(file_name)[1]
            if ending in ending_to_function:
                ending_to_function[ending](directory, file_name, fixation_info, call_fixation_detection_on_data, parameters)

    # Create a DataFrame to store the fixation information
    count_df = pd.DataFrame(fixation_info)

    # Define the output CSV file path
    output_csv = os.path.join("results/", output_csv)

    # Write the DataFrame to a CSV file
    count_df.to_csv(output_csv, index=False)

    # Print the fixation information
    print(count_df)

    print(f"Fixation information saved to {output_csv}")
  


In [7]:
import os
import pandas as pd


def saccade_data_analysis(directory_path, output_csv="pygaze_saccades.csv", parameters={'missing': 0.0, 'minlen': 5, 'maxvel': 40, 'maxacc': 340}):
    """
    Parameters:
        directory_path (str): The path to the directory containing the Tobii eye tracker data TSV files.

    Returns:
        pd.DataFrame: A DataFrame containing the saccade information for each file.
    """
    # Initialize a dictionary to store the saccade counts, total saccade duration, and average saccade duration for each file
    saccade_info = {
        'Participant': [],
        'Task': [],
        'Saccade Count': [],
        'Total Saccade Duration [ms]': [],
        'Average Saccade Duration [ms]': [],
        'Average Saccade Distance [px]': []
    }

    # List all files in the directory and subfolders
    file_names = {}
    for root, dirs, files in os.walk(directory_path):
        dir_filenames = [file_name for file_name in files]
        file_names[root] = dir_filenames

    # Iterate over the files in the directory
    for directory, file_names in file_names.items():
        for file_name in file_names:
            ending = os.path.splitext(file_name)[1]
            if ending in ending_to_function:
                ending_to_function[ending](directory, file_name, saccade_info, call_saccade_detection_on_data, parameters)
        
            

    # Create a DataFrame to store the saccade information
    count_df = pd.DataFrame(saccade_info)

    output_csv = os.path.join("results/", output_csv)

    # Write the DataFrame to a CSV file
    count_df.to_csv(output_csv, index=False)

    # Print the saccade information
    print(count_df)

    print(f"Saccade information saved to {output_csv}")




The functions are now declared, we can analyze the data now

In [8]:

#fixation_data_analysis('data\\26\\raw_data\\', "pygaze_fixations_26.csv")
#saccade_data_analysis('data\\26\\raw_data\\', "pygaze_saccades_26.csv")

#fixation_data_analysis('data\\54\\', "pygaze_fixations_54.csv")
#saccade_data_analysis('data\\54\\', "pygaze_saccades_54.csv")

#fixation_data_analysis('data\\4\\formatted-raw-data\\', "pygaze_fixations_4_2.csv")
#saccade_data_analysis('data\\4\\formatted-raw-data\\', "pygaze_saccades_4.csv")

#fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset.csv")
#saccade_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_saccades_emip_dataset.csv")

# For different parameters

fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_lower_maxdist1.csv", {'missing': 0.0, 'maxdist': 15, 'mindur': 50})
fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_lower_maxdist2.csv", {'missing': 0.0, 'maxdist': 10, 'mindur': 50})


fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_higher_maxdist1.csv", {'missing': 0.0, 'maxdist': 35, 'mindur': 50})
fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_higher_maxdist2.csv", {'missing': 0.0, 'maxdist': 50, 'mindur': 50})


fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_lower_mindur1.csv", {'missing': 0.0, 'maxdist': 25, 'mindur': 35})	
fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_lower_mindur2.csv", {'missing': 0.0, 'maxdist': 25, 'mindur': 20})

fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_higher_mindur1.csv", {'missing': 0.0, 'maxdist': 25, 'mindur': 65})
fixation_data_analysis('data\\emip_dataset\\rawdata\\', "pygaze_fixations_emip_dataset_higher_mindur2.csv", {'missing': 0.0, 'maxdist': 25, 'mindur': 80})



prepare_tobii_data data\emip_dataset\rawdata\100_rawdata.tsv
possible skipped rows worked 37
prepare_tobii_data data\emip_dataset\rawdata\101_rawdata.tsv
possible skipped rows worked 32
prepare_tobii_data data\emip_dataset\rawdata\102_rawdata.tsv
possible skipped rows worked 37
prepare_tobii_data data\emip_dataset\rawdata\103_rawdata.tsv
possible skipped rows worked 32
prepare_tobii_data data\emip_dataset\rawdata\104_rawdata.tsv
possible skipped rows worked 37
prepare_tobii_data data\emip_dataset\rawdata\105_rawdata.tsv
possible skipped rows worked 32
prepare_tobii_data data\emip_dataset\rawdata\106_rawdata.tsv
possible skipped rows worked 37
prepare_tobii_data data\emip_dataset\rawdata\107_rawdata.tsv
possible skipped rows worked 32
prepare_tobii_data data\emip_dataset\rawdata\108_rawdata.tsv
possible skipped rows worked 37
prepare_tobii_data data\emip_dataset\rawdata\109_rawdata.tsv
possible skipped rows worked 41
prepare_tobii_data data\emip_dataset\rawdata\10_rawdata.tsv
possible s

Analyze the orginal data for fixations/saccades

In [2]:
import numpy as np

opt = dict()
# General variables for eye-tracking data
# maximum value of horizontal resolution in pixels
opt['xres'] = 1920.0
opt['yres'] = 1080.0  # maximum value of vertical resolution in pixels
# missing value for horizontal position in eye-tracking data (example data uses -xres). used throughout
# internal_helpers as signal for data loss
opt['missingx'] = -opt['xres']
# missing value for vertical position in eye-tracking data (example data uses -yres). used throughout
# internal_helpers as signal for data loss
opt['missingy'] = -opt['yres']
# sampling frequency of data (check that this value matches with values actually obtained from measurement!)
opt['freq'] = 250.0

# Variables for the calculation of visual angle
# These values are used to calculate noise measures (RMS and BCEA) of
# fixations. The may be left as is, but don't use the noise measures then.
# If either or both are empty, the noise measures are provided in pixels
# instead of degrees.
# screen size in cm
opt['scrSz'] = [55.0, 32.5]
# distance to screen in cm.
opt['disttoscreen'] = 65.0

# STEFFEN INTERPOLATION
# max duration (s) of missing values for interpolation to occur
opt['windowtimeInterp'] = 0.1
# amount of data (number of samples) at edges needed for interpolation
opt['edgeSampInterp'] = 2
# maximum displacement during missing for interpolation to be possible
opt['maxdisp'] = opt['xres'] * 0.2 * np.sqrt(2)

# # K-MEANS CLUSTERING
# time window (s) over which to calculate 2-means clustering (choose value so that max. 1 saccade can occur)
opt['windowtime'] = 0.2
# time window shift (s) for each iteration. Use zero for sample by sample processing
opt['steptime'] = 0.02
# maximum number of errors allowed in k-means clustering procedure before proceeding to next file
opt['maxerrors'] = 100
opt['downsamples'] = [2, 5, 10]
# use chebychev filter when down sampling? 1: yes, 0: no. requires signal processing toolbox. is what matlab's
# down sampling internal_helpers do, but could cause trouble (ringing) with the hard edges in eye-movement data
opt['downsampFilter'] = False

# # FIXATION DETERMINATION
# number of standard deviations above mean k-means weights will be used as fixation cutoff
opt['cutoffstd'] = 2.0
# number of MAD away from median fixation duration. Will be used to walk forward at fixation starts and backward at
# fixation ends to refine their placement and stop algorithm from eating into saccades
opt['onoffsetThresh'] = 3.0
# maximum Euclidean distance in pixels between fixations for merging
opt['maxMergeDist'] = 40.0
# maximum time in ms between fixations for merging
opt['maxMergeTime'] = 60.0
# minimum fixation duration after merging, fixations with shorter duration are removed from output
opt['minFixDur'] = 90.0

In [40]:

def analyze_tobii_data(directory_path, file_name, fixation_info):
	tsv_file = os.path.join(directory_path, file_name)

	# Load the Tobii eye tracker data into a Pandas DataFrame
	df = pd.read_csv(tsv_file, delimiter='\t', low_memory=False)
	df = df[['Gaze point X [DACS px]', 'Gaze point Y [DACS px]', 'Recording timestamp [ms]']]
	df = df.fillna(0.0)

	# Define parameters for fixation detection
	x = df['Gaze point X [DACS px]']  # X-coordinate data
	y = df['Gaze point Y [DACS px]']  # Y-coordinate data
	time = df['Recording timestamp [ms]']

def analyze_csv_data(directory_path, file_name, fixation_info):
	csv_file = os.path.join(directory_path, file_name)

	file_name_split = file_name.split('-')
	task_number = file_name_split[0][1:]
	participant_id = file_name_split[1]

	# Load the eye tracker data into a Pandas DataFrame
	df = pd.read_csv(csv_file, delimiter=',', low_memory=False, on_bad_lines='skip')
	time_column = df.filter(like='TIME(')

	time_column = time_column.rename(columns={time_column.columns[0]: 'TIME'})

	df = time_column

	# convert time to milliseconds
	df['TIME'] = df['TIME']*1000

	total_duration = df['TIME'].max() - df['TIME'].min()
	average_duration = total_duration / len(df['TIME'])

	fixation_info['Participant'].append(participant_id)
	fixation_info['Task'].append(task_number)
	fixation_info['Fixation Count'].append(len(df['TIME']))
	fixation_info['Total Fixation Duration [ms]'].append(total_duration)
	fixation_info['Average Fixation Duration [ms]'].append(average_duration)



def analyze_xml_data(directory_path, file_name, fixation_info):
	# We only want the eclipse xml files as those have the x and y coordinates
	if not file_name.contains('eclipse'):
		return

	xml_file = os.path.join(directory_path, file_name)

	path_elements = directory_path.split(os.sep)
	participant_id = path_elements[-3]
	task = path_elements[-2].split('-')[2]

	tree = ET.parse(xml_file)
	root = tree.getroot()

	# Extract data from XML
	data = []
	for response_elem in root.findall(".//response"):
		x = float(response_elem.get("x"))
		y = float(response_elem.get("y"))
		event_time = int(response_elem.get("event_time"))
		
		data.append({"event_time": event_time, "x": x, "y": y})

	# Create DataFrame
	df = pd.DataFrame(data)


	
original_ending_to_function = {
	'.tsv': analyze_tobii_data,
	'.csv': analyze_csv_data,
	'study4': analyze_csv_study_4_data,
	'.xml': analyze_xml_data
}

In [41]:
def original_fixation_data_analysis(directory_path, output_csv = "pygaze_fixations_original.csv", fn_type = None):

    # Initialize a dictionary to store the fixation counts, total fixation duration, and average fixation duration for each file
    fixation_info = {
        'Participant': [],
        'Task': [],
        'Fixation Count': [],
        'Total Fixation Duration [ms]': [],
        'Average Fixation Duration [ms]': []
    }

    saccade_info = {
        'Participant': [],
        'Task': [],
        'Saccade Count': [],
        'Total Saccade Duration [ms]': [],
        'Average Saccade Duration [ms]': []
    }


    # List all files in the directory and subfolders
    file_names = {}
    for root, dirs, files in os.walk(directory_path):
        dir_filenames = [file_name for file_name in files]
        file_names[root] = dir_filenames

    # Iterate over the files in the directory
    print(file_names.items())
    for directory, file_names in file_names.items():
        for file_name in file_names:
            ending = os.path.splitext(file_name)[1]
            if ending in original_ending_to_function:
                if fn_type is not None:
                    original_ending_to_function[fn_type](directory, file_name, fixation_info)
                else:
                    original_ending_to_function[ending](directory, file_name, fixation_info)
                

    # Create a DataFrame to store the fixation information
    count_df = pd.DataFrame(fixation_info)

    # Define the output CSV file path
    output_csv = os.path.join("results/", output_csv)

    # Write the DataFrame to a CSV file
    count_df.to_csv(output_csv, index=False)

    # Print the fixation information
    print(count_df)

    print(f"Fixation information saved to {output_csv}")


dict_items([('data\\26\\fixations\\', ['T0-P108-fixations.csv', 'T0-P164-fixations.csv', 'T0-P165-fixations.csv', 'T0-P169-fixations.csv', 'T0-P193-fixations.csv', 'T0-P228-fixations.csv', 'T0-P309-fixations.csv', 'T0-P313-fixations.csv', 'T0-P320-fixations.csv', 'T0-P322-fixations.csv', 'T0-P327-fixations.csv', 'T0-P340-fixations.csv', 'T0-P370-fixations.csv', 'T0-P372-fixations.csv', 'T0-P376-fixations.csv', 'T0-P379-fixations.csv', 'T0-P382-fixations.csv', 'T0-P393-fixations.csv', 'T0-P431-fixations.csv', 'T0-P435-fixations.csv', 'T0-P442-fixations.csv', 'T0-P459-fixations.csv', 'T0-P490-fixations.csv', 'T0-P523-fixations.csv', 'T0-P539-fixations.csv', 'T0-P548-fixations.csv', 'T0-P561-fixations.csv', 'T0-P604-fixations.csv', 'T0-P606-fixations.csv', 'T0-P620-fixations.csv', 'T0-P623-fixations.csv', 'T0-P627-fixations.csv', 'T0-P637-fixations.csv', 'T0-P641-fixations.csv', 'T0-P709-fixations.csv', 'T0-P718-fixations.csv', 'T0-P722-fixations.csv', 'T0-P742-fixations.csv', 'T0-P745-fi

In [None]:
original_fixation_data_analysis('data\\26\\dataEvaluation\\data\\filtered_data', "pygaze_fixations_26.csv")

In [36]:
import numpy as np

opt = dict()
# General variables for eye-tracking data
# maximum value of horizontal resolution in pixels
opt['xres'] = 1920.0
opt['yres'] = 1080.0  # maximum value of vertical resolution in pixels
# missing value for horizontal position in eye-tracking data (example data uses -xres). used throughout
# internal_helpers as signal for data loss
opt['missingx'] = -opt['xres']
# missing value for vertical position in eye-tracking data (example data uses -yres). used throughout
# internal_helpers as signal for data loss
opt['missingy'] = -opt['yres']
# sampling frequency of data (check that this value matches with values actually obtained from measurement!)
opt['freq'] = 250.0

# Variables for the calculation of visual angle
# These values are used to calculate noise measures (RMS and BCEA) of
# fixations. The may be left as is, but don't use the noise measures then.
# If either or both are empty, the noise measures are provided in pixels
# instead of degrees.
# screen size in cm
opt['scrSz'] = [55.0, 32.5]
# distance to screen in cm.
opt['disttoscreen'] = 65.0

# STEFFEN INTERPOLATION
# max duration (s) of missing values for interpolation to occur
opt['windowtimeInterp'] = 0.1
# amount of data (number of samples) at edges needed for interpolation
opt['edgeSampInterp'] = 2
# maximum displacement during missing for interpolation to be possible
opt['maxdisp'] = opt['xres'] * 0.2 * np.sqrt(2)

# # K-MEANS CLUSTERING
# time window (s) over which to calculate 2-means clustering (choose value so that max. 1 saccade can occur)
opt['windowtime'] = 0.2
# time window shift (s) for each iteration. Use zero for sample by sample processing
opt['steptime'] = 0.02
# maximum number of errors allowed in k-means clustering procedure before proceeding to next file
opt['maxerrors'] = 100
opt['downsamples'] = [2, 5, 10]
# use chebychev filter when down sampling? 1: yes, 0: no. requires signal processing toolbox. is what matlab's
# down sampling internal_helpers do, but could cause trouble (ringing) with the hard edges in eye-movement data
opt['downsampFilter'] = False

# # FIXATION DETERMINATION
# number of standard deviations above mean k-means weights will be used as fixation cutoff
opt['cutoffstd'] = 2.0
# number of MAD away from median fixation duration. Will be used to walk forward at fixation starts and backward at
# fixation ends to refine their placement and stop algorithm from eating into saccades
opt['onoffsetThresh'] = 3.0
# maximum Euclidean distance in pixels between fixations for merging
opt['maxMergeDist'] = 40.0
# maximum time in ms between fixations for merging
opt['maxMergeTime'] = 60.0
# minimum fixation duration after merging, fixations with shorter duration are removed from output
opt['minFixDur'] = 90.0

def analyze_csv_data_study_24():
	base_path = os.path.join(os.getcwd(), 'data\\24\\dataEvaluation\\')
	path = os.path.join(base_path,'data\\filteredData\\filtered_data.csv')
	csv_file = os.path.join(os.getcwd(), path)
	df_behavioral = pd.read_csv(csv_file)
	df_fixation = pd.DataFrame([], columns=["Participant", "Algorithm", "Behavioral", "StartTime", "EndTime", "Duration", "IsOutlier", "SkillScore",
										"Fixation_startT", "Fixation_endT",  "Fixation_x", "Fixation_y", "Fixation_x_range", "Fixation_y_range"])
	#iterate through each row to generate fixation data
	for index, row in df_behavioral.iterrows():
		print("Analyzing row", index, "of particpants", row["Participant"])
		# read in eyetracking file
		eyetracking_file = row["Eyetracking"]
		# Exchange './' with the current working directory
		eyetracking_file = os.path.join(base_path, eyetracking_file[2:])
		df_eyetracking = pd.read_csv(eyetracking_file)
		# normalize the time regarding eyetracking to 0
		df_eyetracking["time"] = df_eyetracking["time"].astype(float)
		df_eyetracking["time"] = df_eyetracking["time"] - df_eyetracking["time"].iloc[0]

		# drop unused columns
		df_eyetracking = df_eyetracking.drop(columns=["l_gaze_point_in_user_coordinate_system_x",
													"l_gaze_point_in_user_coordinate_system_y",
													"l_gaze_point_in_user_coordinate_system_z",
													"r_gaze_point_in_user_coordinate_system_x",
													"r_gaze_point_in_user_coordinate_system_y",
													"r_gaze_point_in_user_coordinate_system_z",
													"l_gaze_origin_in_user_coordinate_system_x",
													"l_gaze_origin_in_user_coordinate_system_y",
													"l_gaze_origin_in_user_coordinate_system_z",
													"r_gaze_origin_in_user_coordinate_system_x",
													"r_gaze_origin_in_user_coordinate_system_y",
													"r_gaze_origin_in_user_coordinate_system_z"])

		# convert eyetracking data to display coordinates
		df_eyetracking["l_display_x"] = df_eyetracking["l_display_x"].astype(float) * opt["xres"]
		df_eyetracking["l_display_y"] = df_eyetracking["l_display_y"].astype(float) * opt["yres"]
		df_eyetracking["r_display_x"] = df_eyetracking["r_display_x"].astype(float) * opt["xres"]
		df_eyetracking["r_display_y"] = df_eyetracking["r_display_y"].astype(float) * opt["yres"]

		# convert miss column to right integer used by I2MC
		df_eyetracking["l_miss_x"] = df_eyetracking.apply(lambda row: row["l_display_x"] < -opt["xres"] or row["l_display_x"] > 2 * opt["xres"], axis=1)
		df_eyetracking["l_miss_y"] = df_eyetracking.apply(lambda row: row["l_display_y"] < -opt["yres"] or row["l_display_y"] > 2 * opt["yres"], axis=1)
		df_eyetracking["r_miss_x"] = df_eyetracking.apply(lambda row: row["r_display_x"] < -opt["xres"] or row["r_display_x"] > 2 * opt["xres"], axis=1)
		df_eyetracking["r_miss_y"] = df_eyetracking.apply(lambda row: row["r_display_y"] < -opt["yres"] or row["r_display_y"] > 2 * opt["yres"], axis=1)

		df_eyetracking["l_miss"] = df_eyetracking.apply(lambda row: row["l_miss_x"] or row["l_miss_y"] or not row["l_valid"] >= 1, axis=1)
		df_eyetracking["r_miss"] = df_eyetracking.apply(lambda row: row["r_miss_x"] or row["r_miss_y"] or not row["r_valid"] >= 1, axis=1)

		# Set a default value for missing data
		df_eyetracking.loc[df_eyetracking["l_miss"], "l_display_x"] = opt["missingx"]
		df_eyetracking.loc[df_eyetracking["l_miss"], "l_display_y"] = opt["missingy"]
		df_eyetracking.loc[df_eyetracking["r_miss"], "r_display_x"] = opt["missingx"]
		df_eyetracking.loc[df_eyetracking["r_miss"], "r_display_y"] = opt["missingy"]

		# drop unused columns
		df_eyetracking = df_eyetracking.drop(columns=["l_miss_x", "l_miss_y", "r_miss_x", "r_miss_y", "l_miss", "r_miss"])

		# rename columns to match I2MC format
		df_eyetracking.rename(columns={"l_display_x": "L_X",
									"l_display_y": "L_Y",
									"r_display_x": "R_X",
									"r_display_y": "R_Y",
									"l_valid" : "LValidity",
									"r_valid" : "RValidity"}, inplace=True)
		
		# Add new columns X and Y which are the average of L_X and R_X and L_Y and R_Y
		df_eyetracking["X"] = (df_eyetracking["L_X"] + df_eyetracking["R_X"]) / 2
		df_eyetracking["Y"] = (df_eyetracking["L_Y"] + df_eyetracking["R_Y"]) / 2
		
		# Transform time to ms
		df_eyetracking["time"] = df_eyetracking["time"].astype(float) * 1000.0

		missing = 0.0  # Specify the missing value threshold (if any)
		maxdist = 25  # Maximum distance for a fixation (adjust as needed)
		mindur = 50  # Minimum duration for a fixation (adjust as needed)

		# Perform fixation detection using the fixed fixation_detection function
		Sfix, Efix = fixation_detection_fixed(df_eyetracking["X"], df_eyetracking["Y"], df_eyetracking["time"], missing=missing, maxdist=maxdist, mindur=mindur)

		# save the fixation
		# extract meta data
		participant = row["Participant"]
		algorithm = row["Algorithm"]
		behavioral = row["Behavioral"]
		start_time = row["StartTime"]
		end_time = row["EndTime"]
		duration = row["Duration"]
		is_outlier = row["IsOutlier"]
		skill_score = row["SkillScore"]

		# extract fixation data
		fixations_start_time = np.array([entry[0] for entry in Efix])
		fixations_end_time = np.array([entry[1] for entry in Efix])
		fixations_x_pos = np.array([entry[3] for entry in Efix])
		fixations_y_pos = np.array([entry[4] for entry in Efix])
		fixations_x_range = np.array([])
		fixations_y_range = np.array([])

		# append data to dataframe
		df_fixation.loc[len(df_fixation)] = [participant, algorithm, behavioral, start_time, end_time, duration, is_outlier, skill_score,
											fixations_start_time, fixations_end_time, fixations_x_pos, fixations_y_pos, fixations_x_range, fixations_y_range]
		

	# Transform the lists to strings
	df_fixation["Fixation_startT"] = df_fixation["Fixation_startT"].astype(str)
	df_fixation["Fixation_endT"] = df_fixation["Fixation_endT"].astype(str)
	df_fixation["Fixation_x"] = df_fixation["Fixation_x"].astype(str)
	df_fixation["Fixation_y"] = df_fixation["Fixation_y"].astype(str)
	df_fixation["Fixation_x_range"] = df_fixation["Fixation_x_range"].astype(str)
	df_fixation["Fixation_y_range"] = df_fixation["Fixation_y_range"].astype(str)

	# Save the data
	df_fixation.to_csv("results/pygaze_fixations_24.csv", index=False, sep=";", float_format='{:f}'.format)






In [37]:
analyze_csv_data_study_24()


Analyzing row 0 of particpants 1
Analyzing row 1 of particpants 1
Analyzing row 2 of particpants 1
Analyzing row 3 of particpants 1
Analyzing row 4 of particpants 1
Analyzing row 5 of particpants 1
Analyzing row 6 of particpants 1
Analyzing row 7 of particpants 1
Analyzing row 8 of particpants 1
Analyzing row 9 of particpants 1
Analyzing row 10 of particpants 1
Analyzing row 11 of particpants 1
Analyzing row 12 of particpants 1
Analyzing row 13 of particpants 1
Analyzing row 14 of particpants 1
Analyzing row 15 of particpants 1
Analyzing row 16 of particpants 1
Analyzing row 17 of particpants 1
Analyzing row 18 of particpants 1
Analyzing row 19 of particpants 1
Analyzing row 20 of particpants 1
Analyzing row 21 of particpants 1
Analyzing row 22 of particpants 1
Analyzing row 23 of particpants 1
Analyzing row 24 of particpants 1
Analyzing row 25 of particpants 1
Analyzing row 26 of particpants 1
Analyzing row 27 of particpants 1
Analyzing row 28 of particpants 1
Analyzing row 29 of part