In [1]:
import json
import numpy as np
import csv
import scipy.stats.distributions
from scipy.spatial.transform import Rotation
import os
from matplotlib import pyplot as plt
import random
import math
import cvxpy as cp
import csv

# Helper Functions

### DIspersion filter

In [2]:
def dispersion_filtering(x, y, dispersion_threshold = 6, duraiton_threshold=0.2):
    start = 0
    window = []
    if len(x) == 0:
        return [], []
    fps = int(np.round(1/(x[1] - x[0])))
    duration_threshold_frames = np.ceil(duraiton_threshold * fps)
    
    def dispersion(arr):
        # input is a 2d array
        disp = np.max(arr[:, 0]) - np.min(arr[:, 0]) + np.max(arr[:, 1]) - np.min(arr[:, 1])
        return disp

    fixations = []
    fixations_intervals = []
    
    # while there are still points
    while int(start+duration_threshold_frames) < y.shape[0]:
        # initialize a window:
        window = list(range(int(start), int(start+duration_threshold_frames)))
        start = start + duration_threshold_frames
        disp = dispersion(y[window])
        # while the dispersion is less than the threshold
        while disp <= dispersion_threshold:
            if window[-1]+1 < y.shape[0]:
                window.append(window[-1]+1)
            start = start + 1
            if start >= y.shape[0]:
                break
            disp = dispersion(y[window])

        # if the current set of points never fit the duration criteria
        if len(window) <= duration_threshold_frames:
            start = start + 1
        # otherwise note it as fixations
        else:
            centroid = np.mean(y[window], axis=0)
            duration = (window[-1] - window[0]) / fps
            fixations.append([centroid[0], centroid[1], duration])
            fixations_intervals.append([window[0], window[-1]])
    fixations = np.array(fixations)
    return fixations, fixations_intervals


### Find Head Angle

In [3]:
def get_head_angles(head_dir, target_dir, target_fixations_intervals, target_fixation_duration, persistant_target_angle = 0):
    # find the head fixations that are within the target fixations
    head_fixation_final_val = []
    head_fixations_closest_val = []
    head_fixations_extreme_val = [] # not being used
    head_fixations_value_held = [] # not being used 
    head_fixations_value_held_isValid = [] # not being used
    # find a few head fixations that are within the target fixations
    non_zero_fixation_angles = []
    non_zero_fixation_duration = []
    # find a few head fixations that are within the target fixations
    for i in range(0, len(target_fixations_intervals)):
        angle = target_dir[target_fixations_intervals[i][0]:target_fixations_intervals[i][1], 2].mean()
        if np.abs(angle - persistant_target_angle) > 0:
            non_zero_fixation_angles.append(angle) # this only record non-zero angles
            non_zero_fixation_duration.append(target_fixation_duration[i])
            target_interval = target_fixations_intervals[i]
            start = target_interval[0]
            end = target_interval[1]
            ################# find the head intervals that starts within the target interval, and use average as angle of the head #################
            # intersected = []
            # for j in range(0, len(head_fixations_intervals)):
            #     if head_fixations_intervals[j][0] >= start and head_fixations_intervals[j][0] <= end:
            #         intersected.append(j)
            # # the fixation interval closest to the target interval is selected
            # closest_angle = 10000000
            # closest_index = -100
            # if len(intersected) > 0:
            #     for j in intersected:
            #         head_fixation_value = head_dir[head_fixations_intervals[j][0]:head_fixations_intervals[j][1]][:, 2].mean()
            #         target_angle = target_dir[target_fixations_intervals[i][0]:target_fixations_intervals[i][1]][:, 2].mean()
            #         if np.abs(head_fixation_value - target_angle) < closest_angle:
            #             closest_angle = np.abs(head_fixation_value - target_angle)
            #             closest_index = j
            #     head_fixations_value_held.append(head_dir[head_fixations_intervals[closest_index][0]:head_fixations_intervals[closest_index][1]][:, 2].mean())
            #     head_fixations_value_held_isValid.append(True)
            # else:
            #     head_fixations_value_held.append(head_dir[target_fixations_intervals[i][0]:target_fixations_intervals[i][1]][:, 2].mean())
            #     head_fixations_value_held_isValid.append(False)
            
            ################# find the closest value within the target interval #################
            extreme_value_index = np.argmin(np.abs(head_dir[start:end][:, 2] - angle))
            head_fixations_closest_val.append(head_dir[start:end][:, 2][extreme_value_index])
            # if head_fixations_closest_val[-1] / target_dir[start:end][:, 2].mean() < 0:
            #     print("wrong sign")
            #     continue

            ################# find the extreme value within the target interval #################
            extreme_value_index = np.argmax(np.abs(head_dir[start:end][:, 2]))
            head_fixations_extreme_val.append(head_dir[start:end][:, 2][extreme_value_index])

            ################# find the final value within target interval #################
            head_fixation_final_val.append(head_dir[end][2])
            # plt.plot(head_dir[start:end][:, 0], head_dir[start:end][:, 2], label="head")
            # plt.plot(target_dir[start:end][:, 0], target_dir[start:end][:, 2], label="target")
            # plt.plot(head_dir[start:end][:, 0], np.ones(head_dir[start:end][:, 0].shape) * head_fixations_extreme_val[-1], label="head fixation (from extreme value method)")
            # plt.plot(head_dir[start:end][:, 0], np.ones(head_dir[start:end][:, 0].shape) * head_fixations_value_held[-1], label="head fixation (from value held method)")
            # plt.plot(head_dir[start:end][:, 0], np.ones(head_dir[start:end][:, 0].shape) * head_fixations_closest_val[-1], label="head fixation (from closest value method)")
            # plt.plot(head_dir[start:end][:, 0], np.ones(head_dir[start:end][:, 0].shape) * head_fixation_final_val[-1], label="head fixation (from final value method)")
          
            # plt.legend()
            # plt.show()
        else:
            head_fixations_closest_val.append(persistant_target_angle)
            head_fixations_extreme_val.append(persistant_target_angle)
            head_fixations_value_held.append(persistant_target_angle)
            head_fixations_value_held_isValid.append(False)
            non_zero_fixation_angles.append(persistant_target_angle)
            non_zero_fixation_duration.append(persistant_target_angle)
    
    head_fixations_closest_val = np.array(head_fixations_closest_val)
    head_fixations_extreme_val = np.array(head_fixations_extreme_val)
    head_fixations_value_held = np.array(head_fixations_value_held)
    head_fixations_value_held_isValid = np.array(head_fixations_value_held_isValid)
    head_fixation_final_val = np.array(head_fixation_final_val)
    non_zero_fixation_angles = np.array(non_zero_fixation_angles)
    non_zero_fixation_duration = np.array(non_zero_fixation_duration)
    
    return head_fixations_closest_val, non_zero_fixation_angles, non_zero_fixation_duration

        



### find CHOR

In [4]:
class GaussianKernal:
    def __init__(self, loc, sigma, amplitude):
        self.sigma = sigma
        self.loc = loc
        self.amplitude = amplitude
        self.gaussina = scipy.stats.distributions.norm(loc, sigma)
    def cumulate(self, t_arr, x_arr):
        # t_arr is the time array
        # x_arr is the value array, in which we will cumulate a series of GaussianKernals
        x_arr += self.amplitude * self.gaussina.pdf(t_arr)
        return x_arr

def generate_smooth_distribution(fixation_angles):
    # generate a smooth distribution of the fixation ang
    kernal_list = []
    kernal_std = 3
    kernal_amplitude = 1
    for i in range(len(fixation_angles)):
        kernal_list.append(GaussianKernal(fixation_angles[i], kernal_std, kernal_amplitude))
    t_arr = np.linspace(-100, 100, 1000)
    x_arr = np.zeros(t_arr.shape)
    for kernal in kernal_list:
        x_arr = kernal.cumulate(t_arr, x_arr)
    x_arr = x_arr/np.sum(x_arr)
    return t_arr, x_arr, kernal_list
def generate_CHOR_distribution(head_angles, target_angles, target_angle_distribution):
    # find the closest angle in the target angle distribution for each head angle
    head_kernal_list = []
    for i in range(len(target_angles)):
        closest_target_angle_index = np.argmin(np.abs(target_angle_distribution[0] - target_angles[i]))
        # scaling the amplitude of each head angle by the corresponding target angle distribution
        head_kernal_list.append(GaussianKernal(head_angles[i], 3, 1 / target_angle_distribution[1][closest_target_angle_index]))
    t_arr = np.linspace(-100, 100, 1000)
    x_arr = np.zeros(t_arr.shape)
    for kernal in head_kernal_list:
        x_arr = kernal.cumulate(t_arr, x_arr)
    x_arr = x_arr/np.sum(x_arr)
    # find start and end range that contains 90% of the distribution
    cumulative_x_arr = np.cumsum(x_arr)
    start_index = np.argmin(np.abs(cumulative_x_arr - 0.05))
    end_index = np.argmin(np.abs(cumulative_x_arr - 0.95))
    start_angle = t_arr[start_index]
    end_angle = t_arr[end_index]

    return t_arr, x_arr, start_angle, end_angle
def get_CHOR(fixation_angles, head_fixation_angles, angle_to_ignore=0):
    is_not_zero = np.where(np.abs(fixation_angles-angle_to_ignore) <= 1E-3, False, True)
    target_angle_distribution = generate_smooth_distribution(fixation_angles[is_not_zero])
    head_angle_distribution = generate_CHOR_distribution(head_fixation_angles[is_not_zero], fixation_angles[is_not_zero], target_angle_distribution)
    CHOR_start, CHOR_end = head_angle_distribution[2], head_angle_distribution[3]
    return CHOR_start, CHOR_end, target_angle_distribution, head_angle_distribution



In [5]:
def align_target_and_results(collected_gaze, collected_duration, target_gaze, target_duration):
    deletion_index = -1
    for i in range(0, len(target_duration)):
        if np.abs(collected_gaze[i] - target_gaze[i]) > 1E-3:
            deletion_index = i
            break

    if deletion_index < 0:
        return target_gaze, target_duration
    else:
        new_target_gaze = np.concatenate([target_gaze[0:deletion_index], target_gaze[deletion_index+1:]])
        new_target_duration = np.concatenate([target_duration[0:deletion_index], target_duration[deletion_index+1:]])
        return new_target_gaze, new_target_duration

## parse data

In [6]:
def align_target_and_results(collected_gaze, collected_duration, target_gaze, target_duration):
    deletion_index = -1
    for i in range(0, len(target_duration)):
        if np.abs(collected_gaze[i] - target_gaze[i]) > 1E-3:
            deletion_index = i
            break

    if deletion_index < 0:
        return target_gaze, target_duration
    else:
        new_target_gaze = np.concatenate([target_gaze[0:deletion_index], target_gaze[deletion_index+1:]])
        new_target_duration = np.concatenate([target_duration[0:deletion_index], target_duration[deletion_index+1:]])
        return new_target_gaze, new_target_duration
def get_angle_per_instance(head_dir, eye_dir, target_pos, target_input, three_target=False):
        # prevent the angle from going to 360
    head_dir[:, 1:] = np.where(head_dir[:, 1:] > 180, head_dir[:, 1:] - 360, head_dir[:, 1:]) 
    eye_dir[:, 1:] = np.where(eye_dir[:, 1:] > 180, eye_dir[:, 1:] - 360, eye_dir[:, 1:])     
    
    # get the target directions
    target_dir = np.zeros(target_pos.shape)
    target_dir[:, 0] = target_pos[:, 0]
    for i in range(target_pos.shape[0]):
        # get the object rotation from scipy
        vec = target_pos[i, 1:]
        vec = vec / np.linalg.norm(vec)
        rot_y = np.sign(vec[0]) * (90 - np.arctan(abs(vec[2])/abs(vec[0])) * 180 / np.pi)
        # print(rot_y, target_dir[i, 2])
        target_dir[i, 2] = -rot_y
    # plt.plot(target_dir[:, 2].tolist())
    # note the dispersion is originally for a 2D input array
    target_fixations, target_fixations_intervals = dispersion_filtering(target_dir[:, 0], target_dir[:, 1:], 0.1, duraiton_threshold=0.0001)
    # find the number of nans in target_dir
    target_fixations_angles = target_fixations[:, 1]
    target_fixation_duration = target_fixations[:, 2]
    # get the combined head and eye angles
    gaze_dir = eye_dir + head_dir
    gaze_fixations, gaze_fixations_intervals = dispersion_filtering(gaze_dir[:, 0], gaze_dir[:, 1:], 3, duraiton_threshold=0.2)
    gaze_fixations_angles = gaze_fixations[:, 1]
    gaze_fixation_duration = gaze_fixations[:, 2]


    # get the target durations from the experimental setup file
    target_present_duration = target_input["duration"]
    # valid_target_list = np.array(target_input["isTarget"])
    # valid_target_list = np.where(valid_target_list==1, True, False)
    target_present_duration = np.array(target_present_duration)
    # target_present_duration = target_present_duration[valid_target_list]
    target_fixation_angles = target_input["angle"]
    target_fixation_angles = np.array(target_fixation_angles)
    # target_fixation_angles = target_fixation_angles[valid_target_list]
    head_fixation_angles, fixation_angles, fixation_duration = get_head_angles(head_dir, target_dir, target_fixations_intervals, target_fixation_duration, persistant_target_angle=0)
    # remove all the zero angles of collected data
    fixation_angles = np.array(fixation_angles)
    fixation_duration = np.array(fixation_duration)
    head_fixation_angles = np.array(head_fixation_angles)
    target_fixation_angles, target_present_duration = align_target_and_results(fixation_angles, fixation_duration, target_fixation_angles, target_present_duration)
    
    if three_target:
        # make sure the structure nice
        added_index = []
        minus_index = []
        off_set = 0
        for i in range(0, int(len(target_fixation_angles)/3)):
            if target_fixation_angles[3*i+1+off_set] in set([20, -40]):
                continue
            else:
                if target_fixation_angles[np.minimum(3*i+off_set, len(target_fixation_angles)-1)] in set([20, -40]):
                    added_index.append(3*i)
                    off_set += 1
                elif target_fixation_angles[np.minimum(3*i+2+off_set, len(target_fixation_angles)-1)] in set([20, -40]):
                    minus_index.append(3*i+2)
            
        target_fixation_angles = np.insert(target_fixation_angles, added_index, 0, axis=0)
        target_present_duration = np.insert(target_present_duration, added_index, 1, axis=0)
        head_fixation_angles = np.insert(head_fixation_angles, added_index, 0, axis=0)
        fixation_angles = np.insert(fixation_angles, added_index, 0, axis=0)        
    return target_fixation_angles, target_fixation_duration, fixation_angles, head_fixation_angles, target_present_duration

def get_interval_per_instance(head_dir, eye_dir, target_pos, target_input, three_target=False):
    head_dir[:, 1:] = np.where(head_dir[:, 1:] > 180, head_dir[:, 1:] - 360, head_dir[:, 1:]) 
    eye_dir[:, 1:] = np.where(eye_dir[:, 1:] > 180, eye_dir[:, 1:] - 360, eye_dir[:, 1:])     
    
    # get the target directions
    target_dir = np.zeros(target_pos.shape)
    target_dir[:, 0] = target_pos[:, 0]
    for i in range(target_pos.shape[0]):
        # get the object rotation from scipy
        vec = target_pos[i, 1:]
        vec = vec / np.linalg.norm(vec)
        rot_y = np.sign(vec[0]) * (90 - np.arctan(abs(vec[2])/abs(vec[0])) * 180 / np.pi)
        # print(rot_y, target_dir[i, 2])
        target_dir[i, 2] = -rot_y

    # note the dispersion is originally for a 2D input array
    target_fixations, target_fixations_intervals = dispersion_filtering(target_dir[:, 0], target_dir[:, 1:], 0)
    target_fixations_angles = target_fixations[:, 1]
    target_fixation_duration = target_fixations[:, 2]
    target_fixation_angles = target_input["angle"]
    target_fixation_angles = np.array(target_fixation_angles)
    target_present_duration = target_input["duration"]

    head_fixation_angles, fixation_angles, fixation_duration = get_head_angles(head_dir, target_dir, target_fixations_intervals, target_fixation_duration, persistant_target_angle=0)
    
    # remove all the zero angles of collected data
    fixation_angles = np.array(fixation_angles)
    fixation_duration = np.array(fixation_duration)
    head_fixation_angles = np.array(head_fixation_angles)
    
    
    target_fixation_angles, target_present_duration = align_target_and_results(fixation_angles, fixation_duration, target_fixation_angles, target_present_duration)


    if three_target:
        # make sure the structure is nice
        added_index = []
        minus_index = []
        off_set = 0
        for i in range(0, int(len(target_fixation_angles)/3)):
            if target_fixation_angles[3*i+1+off_set] in set([20, -40]):
                continue
            else:
                if target_fixation_angles[np.minimum(3*i+off_set, len(target_fixation_angles)-1)] in set([20, -40]):
                    added_index.append(3*i)
                    off_set += 1
                elif target_fixation_angles[np.minimum(3*i+2+off_set, len(target_fixation_angles)-1)] in set([20, -40]):
                    minus_index.append(3*i+2)
        for i in range(0, len(added_index)):
            list.insert(target_fixations_intervals, added_index[len(added_index)-1-i], [0, 0])
        return target_fixations_intervals
    else:
        return target_fixations_intervals

# target_fixation_intervals = get_interval_per_instance(all_head_data[0], all_eye_data[0], all_target_data[0], all_target_input[0], True)


# Input all The data For the onward experiment

In [7]:
input_dirs = ["/Users/evanpan/Desktop/Experiment_related_files/Masters_paper_recordings/Experiment_0/collectedData",
              "/Users/evanpan/Desktop/Experiment_related_files/Masters_paper_recordings/Experiment_1/collectedData",
              "/Users/evanpan/Desktop/Experiment_related_files/Masters_paper_recordings/Experiment_2/collectedData"]
all_trials = []
recording_file_names = ['double_look_condition_pilot', "onward_condition_pilot", "tennis_condition_pilot", "random_condition"]
for input_dir in input_dirs:
    # to store the trials of a single subject
    trial_per_subject = {}
    dirs = os.listdir(input_dir)
    # get duration of each trial:
    total_length = 0
    data_types = ["gaze_dir", "head_dir", "head_pos", "target_pos"]
    for fileName in recording_file_names:
        all_eye_data = []
        all_head_data = []
        all_target_data = []
        all_target_input = []

        data_type = "gaze_dir"
        fullPath = os.path.join(input_dir, data_type + "_" + fileName + ".csv")
        eye_data = np.genfromtxt(fullPath, delimiter=',', dtype=float)
        
        data_type = "head_dir"
        fullPath = os.path.join(input_dir, data_type + "_" + fileName + ".csv")
        head_data = np.genfromtxt(fullPath, delimiter=',', dtype=float)
        
        data_type = "target_pos"
        fullPath = os.path.join(input_dir, data_type + "_" + fileName + ".csv")
        target_data = np.genfromtxt(fullPath, delimiter=',', dtype=float)
        
        all_eye_data.append(eye_data)
        all_head_data.append(head_data)
        all_target_data.append(target_data)
        
        setupFileName = os.path.join(*[input_dir, fileName+".json"])
        target_input = json.load(open(setupFileName, "r"))
        all_target_input.append(target_input)
        trial_per_subject[fileName] = [all_eye_data, all_head_data, all_target_data, all_target_input]
    all_trials.append(trial_per_subject)

FileNotFoundError: [WinError 3] The system cannot find the path specified: '/Users/evanpan/Desktop/Experiment_related_files/Masters_paper_recordings/Experiment_0/collectedData'

In [None]:
# Write a class that store head_trajectory, eye_trajectory, target_trajectory an input_target_times, input_target_angle
# the class should also read and write these data to a json file

class ExperimentData:
    # the init can be from the list of data or no information and read from file
    def __init__(self, head_trajectory, eye_trajectory, target_trajectory, input_target_times, input_target_angle):
        self.head_trajectory = head_trajectory
        self.eye_trajectory = eye_trajectory
        self.target_trajectory = target_trajectory
        self.input_target_times = input_target_times
        self.input_target_angle = input_target_angle
    def write_to_json(self, file_name):
        data = {}
        # self.head_trajectory is a 2D numpy array, convert that to be serializable with Json
        head_trajectory = []
        for i in range(0, self.head_trajectory.shape[0]):
            head_trajectory.append(self.head_trajectory[i].tolist())
        # do the sake for eye_trajectory and target_trajectory
        eye_trajectory = []
        for i in range(0, self.eye_trajectory.shape[0]):
            eye_trajectory.append(self.eye_trajectory[i].tolist())
        target_trajectory = []
        for i in range(0, self.target_trajectory.shape[0]):
            target_trajectory.append(self.target_trajectory[i].tolist())



        data["head_trajectory"] = head_trajectory
        data["eye_trajectory"] = eye_trajectory
        data["target_trajectory"] = target_trajectory
        data["input_target_times"] = self.input_target_times
        data["input_target_angle"] = [int(x) for x in self.input_target_angle]
        with open(file_name, 'w') as outfile:
            json.dump(data, outfile)
    # class method to read from json
    @classmethod
    def read_from_json(self, file_name):
        data = json.load(open(file_name, "r"))
        self.head_trajectory = np.array(data["head_trajectory"])
        self.eye_trajectory = np.array(data["eye_trajectory"])
        self.target_trajectory = np.array(data["target_trajectory"])
        self.input_target_times = np.array(data["input_target_times"])
        self.input_target_angle = np.array(data["input_target_angle"])
        return self

In [None]:
for trial_type in recording_file_names:
# if True:
    # trial_type = recording_file_names[i]
    for i in range(len(all_trials)):
        eye_data = all_trials[i][trial_type][0]
        head_data = all_trials[i][trial_type][1]
        target_data = all_trials[i][trial_type][2]
        target_input = all_trials[i][trial_type][3]

        # get the presented target angles and durations:
        target_fixation_angles, target_fixation_duration, fixation_angles, head_fixation_angles, target_present_duration = get_angle_per_instance(head_data[0], eye_data[0], target_data[0], target_input[0], three_target=False)
        # get the intervals of the target fixations
        target_fixation_intervals = get_interval_per_instance(head_data[0], eye_data[0], target_data[0], target_input[0], three_target=False)
        ts = np.array(head_data)[0][:, 0]
        ts = ts - ts[0]
        
        # get the angle of presentation
        input_target_times = []
        input_target_angle = []
        for m in range(0, len(target_fixation_intervals)):
            input_target_times.append(ts[target_fixation_intervals[m][0]])
            input_target_angle.append(target_fixation_angles[m])
        
        # get the trajectories:
        head_trajectory = head_data[0]
        eye_trajectory = eye_data[0]
        target_trajectory = target_data[0]
        # write the data to a json file
        experiment_data = ExperimentData(head_trajectory, eye_trajectory, target_trajectory, input_target_times, input_target_angle)
        file_store_path = "/Volumes/EVAN_DISK/MASC/EYE_IK/ground_truth/"
        print("{}_{}.json".format(input_dirs[i].split("/")[-2], trial_type))
        experiment_data.write_to_json(os.path.join(*[file_store_path, "{}_{}.json".format(input_dirs[i].split("/")[-2], trial_type)]))
        


  rot_y = np.sign(vec[0]) * (90 - np.arctan(abs(vec[2])/abs(vec[0])) * 180 / np.pi)
  rot_y = np.sign(vec[0]) * (90 - np.arctan(abs(vec[2])/abs(vec[0])) * 180 / np.pi)


Experiment_0_double_look_condition_pilot.json
Experiment_1_double_look_condition_pilot.json
Experiment_2_double_look_condition_pilot.json
Experiment_0_onward_condition_pilot.json
Experiment_1_onward_condition_pilot.json
Experiment_2_onward_condition_pilot.json
Experiment_0_tennis_condition_pilot.json
Experiment_1_tennis_condition_pilot.json
Experiment_2_tennis_condition_pilot.json
Experiment_0_random_condition.json
Experiment_1_random_condition.json
Experiment_2_random_condition.json
