# Import

In [1]:
import pandas as pd
import numpy as np
import pygaze
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import scipy as sp
import glob
from tqdm import tqdm

In [None]:
pygaze.__version__

# Fixation and saccade detection

In [None]:
df_data = pd.read_csv("data/PISSS_ID_003_Approach Three Go-Around Gaze-Left.csv")

In [None]:
df_not_null = df_data[df_data["Display"] == 1]

In [None]:
class Point:
    """
    class to define a Point
    """
    def __init__(self,x,y):
        self.x = x
        self.y = y

In [None]:
def calculate_gaze_distance(first_point, destination_point):
    """
    calculate gaze distance between 2 points

    Arguments:
        first_point (Point): first point
        destination_point (Point): second point

    Return:
        (float): distance between 2 points
    """
    return np.sqrt((first_point.x - destination_point.x)**2 + (first_point.y - destination_point.y)**2)

def distance_to_gaze_angle(distance_to_view, gaze_distance):
    """
    convert gaze distance to gaze angle
    assume that distances from pilot's eyes to first point and to destionation point are equal

    Arguments:
        distance_to_view (float): distance from pilot's eyes to the control
        gaze_distance (float): gaze distance

    Return:
        (float): angle of the gaze between 2 points

    """
    return np.arccos(1 - (1/2)*((gaze_distance/distance_to_view)**2))

In [None]:
def calculate_all_distance(list_points):
    """
    Calculate distance between all points consequently

    Arguments:
        list_points (Point): all points
    
    Return:
        all_distance (list(float)): list of distance between Point(n) and Point(n+1)
    
    """
    all_distance = [0]
    for i in range(len(list_points) - 1):
        all_distance.append(calculate_gaze_distance(list_points[i],list_points[i+1]))
    return all_distance

In [None]:
list_points = []
for i in range(len(df_not_null)):
    x = df_not_null.iloc[i]
    pt = Point(int(float(x["X Pos"])), int(float(x["Y Pos"])))
    list_points.append(pt)


In [None]:
def fixation_saccade_classification(list_points, distance_threshold, angle_threshold = 5, time_threshold = 140):
    """
    classify fixation and saccade based on the angle of the eye's gaze,
    if a gaze exceeds 5 degree, it will be considered as a saccade

    Arguments:
        list_points (list(Point)): all points
        distance_threshold (float): if a gaze's distance exceeds this threshold, it will be considered as a saccade (to be removed, use angle threshold instead)
        angle_threshold (float): default by 5 degree
        time_threshold (float): in ms, to determine long, short fixation (to do)
    
    Return:
        list_fixation_saccade (list(int)): denote the moment is a fixation (0) or a saccade (1)
    """
    list_distance = calculate_all_distance(list_points)
    list_fixation_saccade = [1 if v > distance_threshold else 0 for v in list_distance]
    list_fixation_saccade = list_fixation_saccade
    return list_fixation_saccade 

In [None]:
list_fixation_saccade = fixation_saccade_classification(list_points, 13)

In [None]:
# df_not_null["distance"] = list_distance
df_not_null["fixation/saccade"] = list_fixation_saccade

In [None]:
x_coord = df_not_null["X Pos"]
y_coord = df_not_null["Y Pos"]

diameter = df_not_null["Pupil Diameter"]
img = plt.imread("flight.jpg")
h,w,c = img.shape

cols=[]

for i in range(len(list_fixation_saccade)):
    if list_fixation_saccade[i] == 0:
        cols.append("red")
    else:
        cols.append("blue")

    
fig=plt.figure(figsize=(15,8))
ax=fig.add_axes([0,0,1,1])
# ax.scatter(x_coord[0], y_coord[0], "-o")
ax.imshow(img, extent=[0, 1600, 0, 900])

ax.plot()
ax.scatter(x=x_coord,y=y_coord,c=cols)


ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title('gaze')

plt.show()

In [None]:
df_data[df_data["X Pos"]==0.0]

# scan pattern

In [2]:
import detectors
import gazeplotter

In [None]:
participant_ID = "029"

In [None]:
df_data = pd.read_csv("data/PISSS_ID_{}_Approach One Gaze-Vergence.csv".format(participant_ID))

In [None]:
df_data.fillna(0.0, inplace=True)

In [None]:
X = np.array(df_data['X Pos'].tolist())
Y = np.array(df_data['Y Pos'].tolist())
time = np.array(df_data['Start Time (secs)'].tolist())*1000

In [None]:
Sblk, Eblk = detectors.blink_detection(X,Y,time,minlen=10)

In [None]:
Sfix, Efix = detectors.fixation_detection(X,Y,time,maxdist=10,mindur=50)

In [None]:
Ssac, Esac = detectors.saccade_detection(X,Y,time,minlen=5,maxvel=40,maxacc=340)

In [None]:
len(Efix)

In [None]:
gazeplotter.draw_scanpath(Efix, Esac, dispsize=(1023,682), imagefile="flight.jpg", savefilename="scanpath_{}.png".format(participant_ID))

# gaze transition entropy

In [3]:
from sklearn.cluster import DBSCAN

In [None]:
X = np.array(Efix).T[3:].T

In [None]:
clustering = DBSCAN(eps=20, min_samples=3).fit(X)

In [None]:
color = clustering.labels_
color

In [4]:
def get_center(clustering, data):
    center = []
    for i in range(len(set(clustering.labels_)) - 1):
        xi = data[np.where(clustering.labels_ == i)]
        cx = sum(xi.T[0])/len(xi)
        cy = sum(xi.T[1])/len(xi)
        center.append((cx,cy))
    return center

# cluster_center = get_center(clustering,X)

In [None]:
img = plt.imread("flight.jpg")
h,w,c = img.shape
fig=plt.figure(figsize=(15,8))
ax=fig.add_axes([0,0,1,1])
# ax.imshow(img, extent=[0, 1600, 0, 900])
ax.imshow(img)

ax.scatter(X.T[0],X.T[1], c=color, cmap="jet")

ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_title('gaze')

# plt.show()

In [None]:
# gazeplotter.draw_heatmap(Efix, dispsize=(1023,682), imagefile="flight.jpg")

In [None]:
window_length = -1 #10sx60Hz

In [None]:
df_data_window = df_data[:window_length]

In [None]:
wX = np.array(df_data_window['X Pos'].tolist())
wY = np.array(df_data_window['Y Pos'].tolist())
wtime = np.array(df_data_window['Start Time (secs)'].tolist())*1000

In [None]:
wSfix, wEfix = detectors.fixation_detection(wX,wY,wtime,maxdist=20,mindur=50)

In [None]:
len(wEfix)

In [5]:
def transition_matrix(transitions):
    n = 1+ max(transitions) #number of states

    M = [[0]*n for _ in range(n)]

    for (i,j) in zip(transitions,transitions[1:]):
        M[i][j] += 1

    #now convert to probabilities:
    for row in M:
        s = sum(row)
        if s > 0:
            row[:] = [f/s for f in row]
    return M


In [6]:
def distance(x,y):
    return ((x[0]-y[0])**2 + (x[1]-y[1])**2)**0.5

def dbscan_predict(cluster_center, X_new, min_dist = 50, metric=distance):
    # Result is noise by default
    y_new = np.ones(shape=len(X_new), dtype=int)*-1 

    # Iterate all input samples for a label
    for j, x_new in enumerate(X_new):
        # Find a core sample closer than EPS
        for i, x_core in enumerate(cluster_center): 
            if metric(x_new, x_core) < min_dist:
                # Assign label of x_core to x_new
                y_new[j] = i
                break

    return y_new

In [None]:
pred = dbscan_predict(cluster_center, np.array(wEfix).T[3:].T)

In [None]:
pred

In [None]:
transitions = pred[np.where(pred!=-1)]

In [None]:
transitions

In [None]:
trans_matrix = transition_matrix(transitions)
np.around(transition_matrix(transitions),2)

In [None]:
pA = [len(np.where(transitions==i)[0])/len(transitions) for i in range(len(set(transitions)))]

In [None]:
# pA[0]*(trans_matrix[0]*np.log2(trans_matrix[0]))

In [None]:
H = 0
for i in range(len(pA)):
    H += -sum(pA[i]*(trans_matrix[0]*np.log2(trans_matrix[0])))
H

# Velocity

In [None]:
def velocity(x, y, time, missing=0.0):
    x, y, time = detectors.remove_missing(x,y,time,missing)
    intdist = (np.diff(x)**2 + np.diff(y)**2)**0.5
    # get inter-sample times
    inttime = np.diff(time)
    # recalculate inter-sample times to seconds
    inttime = inttime
    
    # VELOCITY AND ACCELERATION
    # the velocity between samples is the inter-sample distance
    # divided by the inter-sample time
    vel = intdist / inttime
    # the acceleration is the sample-to-sample difference in
    # eye movement velocity
    acc = np.diff(vel)
    new_time = time - time[0]
    
    return vel, acc, new_time

In [None]:
vel, acc, new_time = velocity(X,Y,time)

In [None]:
fig, ax = plt.subplots(figsize=(15,8))
plt.plot(new_time[:-1], vel, "r-")

ax.set_xlabel("time (ms)")
ax.set_ylabel("Velocity (pixel/ms)")
plt.show()

# All data

In [7]:
csv_files = glob.glob("data/*.csv")

In [8]:
csv_files_one = [v for v in csv_files if "One Gaze-Left" in v]
csv_files_two = [v for v in csv_files if "Two Gaze-Left" in v]
csv_files_three = [v for v in csv_files if "Three Go-Around Gaze-Left" in v]

In [9]:
trials = []

for csv in [csv_files_one, csv_files_two, csv_files_three]:
    arr_Sblk = []
    arr_Eblk = []
    arr_Sfix = []
    arr_Efix = []
    arr_Ssac = []
    arr_Esac = []
    arr_trans_matrix = []
    arr_Ht = []
    arr_Hs = []
    arr_total_time = []

    for f in tqdm(csv):
        df_data = pd.read_csv(f)
        df_data.fillna(0.0, inplace=True)

        X = np.array(df_data['X Pos'].tolist())
        Y = np.array(df_data['Y Pos'].tolist())
        time = np.array(df_data['Start Time (secs)'].tolist())*1000

        Sblk, Eblk = detectors.blink_detection(X,Y,time,minlen=6)
        Sfix, Efix = detectors.fixation_detection(X,Y,time,maxdist=10,mindur=50)
        Ssac, Esac = detectors.saccade_detection(X,Y,time,minlen=5,maxvel=40,maxacc=340)

        X = np.array(Efix).T[3:].T
        clustering = DBSCAN(eps=20, min_samples=3).fit(X)
        cluster_center = get_center(clustering,X)
        pred = dbscan_predict(cluster_center, np.array(Efix).T[3:].T)
        transitions = pred[np.where(pred!=-1)]

        trans_matrix = transition_matrix(transitions)
    #     np.around(transition_matrix(transitions),2)
        pA = [len(np.where(transitions==i)[0])/len(transitions) for i in range(len(set(transitions)))]
        Ht = 0
        Hs = 0
        for i in range(len(pA)):
            Hs += pA[i]*np.log2(pA[i])
            Ht += -sum(pA[i]*(trans_matrix[0]*np.log2(trans_matrix[0])))



        arr_Sblk.append(Sblk)
        arr_Eblk.append(Eblk)
        arr_Sfix.append(Sfix)
        arr_Efix.append(Efix)
        arr_Ssac.append(Ssac)
        arr_Esac.append(Esac)
        arr_trans_matrix.append(trans_matrix)
        arr_Ht.append(Ht)
        arr_Hs.append(Hs)
        arr_total_time.append(time[-1] - time[0])
        
    d = {
            "Eblk": arr_Eblk,
            "Efix": arr_Efix,
            "Esac": arr_Esac,
            "trans_matrix": arr_trans_matrix,
            "Ht": arr_Ht,
            "Hs": arr_Hs,
            "total_time": arr_total_time
        }
        
    trials.append(d)
    
    

100%|██████████████████████████████████████████████████████████████████████████████████| 32/32 [00:03<00:00, 10.16it/s]


100%|██████████████████████████████████████████████████████████████████████████████████| 32/32 [00:03<00:00,  9.26it/s]


100%|██████████████████████████████████████████████████████████████████████████████████| 31/31 [00:00<00:00, 42.58it/s]


In [None]:
dur_fix = []
for i, fix in enumerate(arr_Efix):
    dur_fix.append(sum(np.array(fix).T[2])/arr_total_time[i])
    
    
dur_sac = []
for i, sac in enumerate(arr_Esac):
    dur_sac.append(sum(np.array(sac).T[2])/arr_total_time[i])
    

In [187]:
x = np.array([trials[0]['Ht'], trials[1]['Ht']]).T

In [203]:
np.isnan(x[0])

array([ True,  True])

In [231]:
x1 = []
for v in x:
    if True not in np.isnan(v):
        x1.append(v)
        
x1 = np.stack(x1, axis=0)

In [236]:
x1.T[0]

array([1.00163516, 1.31284805, 1.06095438, 0.81040979, 1.06333768,
       1.12310401, 1.15709709, 1.08166427, 1.26595191, 0.89363965,
       0.95862239, 1.28312837, 0.89258532])

In [242]:
np.std(x1.T[1])

0.2868128962164242

In [234]:
scipy.stats.ttest_ind(x1.T[0], x1.T[1], equal_var = False)

Ttest_indResult(statistic=0.9753270699610622, pvalue=0.3421494879702286)

In [134]:
dur_fix1 = np.array([])
mean_fix1 = []
for i, p in enumerate(trials[0]['Efix']):
    dur_fix1 = np.append(dur_fix1,np.array(p).T[2])
    mean_fix1.append(np.mean(np.array(p).T[2])/trials[0]['total_time'][i])

dur_fix2 = []
mean_fix2 = []
for i, p in enumerate(trials[1]['Efix']):
    dur_fix2 = np.append(dur_fix2,np.array(p).T[2])
    mean_fix2.append(np.mean(np.array(p).T[2])/trials[1]['total_time'][i])

In [165]:
scipy.stats.ttest_ind(dur_fix1, dur_fix2, equal_var = False)

Ttest_indResult(statistic=-3.0859705990978625, pvalue=0.002031127498962883)

In [164]:
np.std(dur_fix2)

215.17733450960404

In [150]:
np.mean(dur_fix1)

204.92303396467642

In [177]:
scipy.stats.ttest_ind(x, y, equal_var = False)

Ttest_indResult(statistic=0.5206482894598665, pvalue=0.6045157335075184)

In [244]:
x = np.array([len(v) for v in trials[0]['Eblk']])/np.array(trials[0]['total_time'])*1000
y = np.array([len(v) for v in trials[1]['Eblk']])/np.array(trials[1]['total_time'])*1000

In [246]:
print("mean trial 1:", np.mean(x))
print("mean trial 2:", np.mean(y))
print("std trial 1:", np.std(x))
print("std trial 2:", np.std(y))

mean trial 1: 0.25417298709911085
mean trial 2: 0.22899477908179017
std trial 1: 0.20536851539444673
std trial 2: 0.17412976115512052


In [176]:
np.mean(y)

0.22899477908179017