In [1]:
import numpy as np
import warnings
warnings.filterwarnings("ignore")

DATA_DIR = "/Users/timberzhang/Documents/Documents/2024-JustWatch/Data"
FILL_NAN = np.nan
CLUSTER_MIN_EPS = 25
CLUSTER_MIN_SAMPLE = 5
MIN_DTW_WINDOW = 2

VR_SCALE = 0.001207812
VR_ZDIST = 1
EYE_SAMPLE_RATE = 50
EYE_SAMPLE_TIME = (1 / EYE_SAMPLE_RATE) * 1000
SCREEN_SIZE = [1280, 720]

VIDEO_SIZE = [1920, 1080]
VIDEO_FPS = 30
BALL_TRAJ = None

# DataLoader

In [2]:
import os

file_list = list(os.walk(DATA_DIR))[0][2]
file_list = list(filter(lambda x: "AD" in x, file_list))
file_list

['24071512_AD.csv',
 '24070902_AD.csv',
 '24071009_AD.csv',
 '24072825_AD.csv',
 '24070904_AD.csv',
 '24071617_AD.csv',
 '24071721_AD.csv',
 '24071619_AD.csv',
 '24071615_AD.csv',
 '24071011_AD.csv',
 '24070906_AD.csv',
 'Test_AD.csv',
 '24071513_AD.csv',
 '24072926_AD.csv',
 '24071008_AD.csv',
 '24070901_AD.csv',
 '24072023_AD.csv',
 '24071616_AD.csv',
 '24070905_AD.csv',
 '24070907_AD.csv',
 '24071010_AD.csv',
 '24071618_AD.csv']

In [3]:
drop_list = ['pingpang.csv', 'tennis.csv', '.DS_Store', 'ControlGroupInfo.xlsx',]

In [4]:
import pandas as pd


def fetch_data(dir_path:str, file_list:list, drop_list:list=[]):
    
    def fetch_eye_data(_raw_eye_data:str):
        _eye_data_rows = _raw_eye_data.split(";")
        eye_data = {}   
        names = _eye_data_rows[0].split(" ")
        for idx, row in enumerate(_eye_data_rows):
            if idx == 0 : continue

            cur_data = row.split(" ")
            try:
                eye_data[idx] = {
                    names[0] : FILL_NAN if cur_data[0]=="NaN" else float(cur_data[0]),
                    names[1] : FILL_NAN if cur_data[0]=="NaN" else float(cur_data[1]),
                    names[2] : FILL_NAN if cur_data[0]=="NaN" else float(cur_data[2]),
                    names[3] : FILL_NAN if cur_data[0]=="NaN" else float(cur_data[3]),
                }
            except:
                eye_data[idx] = {
                    names[0] : FILL_NAN,
                    names[1] : FILL_NAN,
                    names[2] : FILL_NAN,
                    names[3] : FILL_NAN,
                }

        return eye_data


    _all_data = {}
    for file in file_list:
        if file in drop_list: continue
        _single_person_data_df = pd.read_csv(os.path.join(dir_path, file))
        _single_person_data_dict = {}
        _videos = {}

        for _, row in _single_person_data_df.iterrows():
            _eye_data = fetch_eye_data(row["EyeData"])
            v_id = row["videoName"]
            if v_id in _single_person_data_dict.keys(): 
                _videos[v_id] += 1
                v_id = v_id + f"_{_videos[v_id]}"
            else:
                _videos[v_id] = 1
            _single_person_data_dict[v_id] = _eye_data
        
        _all_data[file.split(".")[0]] = _single_person_data_dict
    
    return _all_data

In [5]:
all_data = fetch_data(DATA_DIR, file_list, drop_list)

In [6]:
def fetch_trajectory(dir_path:str):
    res = {}

    p_traj = pd.read_csv(os.path.join(dir_path, "pingpang.csv"))
    w_traj = pd.read_csv(os.path.join(dir_path, "tennis.csv"))
    _temp_all = pd.concat([p_traj, w_traj], axis=0)
    _temp_all["position"] = _temp_all["position"].apply(lambda x : x.replace(" ", ""))

    for v_id, v_df in _temp_all.groupby(by="video_name"):

        _x = v_df["position"].apply(lambda x : x.split(",")[0][1:]).to_numpy(dtype=np.int16)
        _y = v_df["position"].apply(lambda x : x.split(",")[1][:-1]).to_numpy(dtype=np.int16)

        # reverse the y-axis
        _y = (_y - VIDEO_SIZE[1]) * -1

        # scale and shift
        _x = (_x / VIDEO_SIZE[0]) * SCREEN_SIZE[0] - (SCREEN_SIZE[0] // 2)
        _y = (_y / VIDEO_SIZE[1]) * SCREEN_SIZE[1] - (SCREEN_SIZE[1] // 2)

        _f = v_df["frame_number"].to_numpy(dtype=np.int16)
        _r = v_df["round"].to_numpy(dtype=np.int16)
        _temp_df = pd.DataFrame(
            np.array([_f, _x, _y, _r])
        ).T
        _temp_df.index = range(1, _f.size+1)
        _temp_df.columns = ["frame", "Ball.x", "Ball.y", "round"]

        res[v_id] = _temp_df.to_dict()

    return res

In [7]:
BALL_TRAJ = fetch_trajectory(DATA_DIR)

# Data Preprocesing

In [8]:
# from scipy.signal import savgol_filter

# def smooth_series(series:pd.Series, window_length:int=5, polyorder:int=1):
#     # Interpolate the missing values
#     series_interpolated = series.interpolate(method='linear')
    
#     # Apply a Savitzky-Golay filter for smoothing
#     smoothed_series = savgol_filter(series_interpolated, window_length=window_length, polyorder=polyorder)
    
#     return smoothed_series

# def smooth_data(data:dict):
#     temp = pd.DataFrame(data=data).T
#     temp["Screen.x"] = smooth_series(temp["Screen.x"])
#     temp["Screen.y"] = smooth_series(temp["Screen.y"])
#     return temp.T.to_dict()

In [9]:
# for _person in all_data.keys():
#     for _video in all_data[_person].keys():
#         all_data[_person][_video] = smooth_data(all_data[_person][_video])

# Feature Extraction

## Saccade Feature

In [10]:
from sklearn.cluster import DBSCAN


def max_circle_radius(df:pd.DataFrame):
    centroid_x = df['Screen.x'].mean()
    centroid_y = df['Screen.y'].mean()
    
    distances = np.sqrt((df['Screen.x'] - centroid_x)**2 + (df['Screen.y'] - centroid_y)**2)
    
    max_radius = distances.max()

    if max_radius < VR_SCALE:
        return VR_SCALE
    else:
        return max_radius


def compute_stat(name:str, data_list:list):
    _res = {}
    _res[f"{name}_Mean"] = np.mean(data_list)
    _res[f"{name}_Max"] = np.max(data_list)
    _res[f"{name}_Min"] = np.min(data_list)
    _res[f"{name}_Std"] = np.std(data_list)

    return _res


def compute_saccade_path(df:pd.DataFrame):
    _speeds = []
    _angles = []

    for _c in range(0, df["c"].max()):
        _temp_t1 = df[df["c"]==_c]
        _temp_t2 = df[df["c"]==_c+1]
        
        c_x_t1 = _temp_t1["Screen.x"].mean()
        c_y_t1 = _temp_t1["Screen.y"].mean()
        c_x_t2 = _temp_t2["Screen.x"].mean()
        c_y_t2 = _temp_t2["Screen.y"].mean()

        _dura = ((_temp_t2["t"].mean() - _temp_t1["t"].mean()) * EYE_SAMPLE_TIME ) / 1000.0
        _dist = np.sqrt((c_x_t1 - c_x_t2)**2 + (c_y_t1 - c_y_t2)**2)
        _angle = np.arctan(_dist / VR_ZDIST) / np.pi * 180
        _angles.append(_angle)

        if _dura <= EYE_SAMPLE_TIME / 1000.0:
            _speeds.append(0)
        else:
            _speeds.append( np.divide(_angle, _dura))

    return _speeds, _angles


def extract_saccade_features(data, video_id):
    _fea = {}
    temp = np.array([data["Screen.x"], data["Screen.y"], data.index.to_numpy()]).T
    temp = pd.DataFrame(temp)
    temp.columns=["Screen.x", "Screen.y", "t"]

    db = DBSCAN(eps=CLUSTER_MIN_EPS, min_samples=CLUSTER_MIN_SAMPLE)
    cluster_res = db.fit(temp.dropna())
    clusters = pd.value_counts(cluster_res.labels_)

    if clusters.shape[0] <= 2: 
        return _fea

    # _not_na_index = temp.dropna().index
    # temp["c"] = -1
    # temp.loc[_not_na_index, "c"] = cluster_res.labels_
    temp["c"] = cluster_res.labels_
    ## map the pixel value to real distance
    temp["x"] = temp["x"] * VR_SCALE
    temp["y"] = temp["y"] * VR_SCALE

    _fea["NumOfGazePoints"] = len(clusters.index) - 1
    # "_g" for gaze
    _g_duration = []
    _g_radius = []
    _g_density = []
    _g_density_t = []
    for c, v in clusters.items():
        if c == -1: continue
        _duration = v * EYE_SAMPLE_TIME
        _g_duration.append(_duration)
        _circle_r = max_circle_radius(temp[temp["c"]==c]) * 100
        _g_radius.append(_circle_r)
        _density = v / (np.pi * np.power(_circle_r,2))
        _g_density.append(_density)
        _g_density_t.append(_density / _duration)

    _speeds,_angles = compute_saccade_path(temp)

    _fea = {**_fea, **compute_stat("GazeRadius", _g_radius)}
    _fea = {**_fea, **compute_stat("GazeDuratiuon", _g_duration)}
    _fea = {**_fea, **compute_stat("GazeDensity", _g_density)}
    _fea = {**_fea, **compute_stat("GazeDensityFrequency", _g_density_t)}

    _fea = {**_fea, **compute_stat("SaccadeSpeed", _speeds)}
    _fea = {**_fea, **compute_stat("SaccadeAngel", _angles)}

    return _fea

In [11]:
def compute_saccade_path_lite(df:pd.DataFrame):
    _speeds = [0]
    _angles = [0]
    indices = list(df.index)
    _dura = (1 * EYE_SAMPLE_TIME ) / 1000.0
    df["Screen.x"] = df["Screen.x"] * VR_SCALE
    df["Screen.y"] = df["Screen.y"] * VR_SCALE

    for _i in range(1, len(indices)):
        row_1 = df.loc[indices[_i - 1], :]
        row_2 = df.loc[indices[_i], :]

        _dist = np.sqrt((row_1["Screen.x"] - row_2["Screen.x"])**2 + (row_1["Screen.y"] - row_2["Screen.y"])**2)
        _angle = np.arctan(_dist / VR_ZDIST) / np.pi * 180

        _angles.append(_angle)
        _speeds.append(np.divide(_angle, _dura))

    return _speeds, _angles


def compute_drop_point_bias_pred(eye_data:pd.DataFrame, eye_index, ball_data:pd.DataFrame, ball_round, eye_drop="last"):
    if eye_drop=="last":
        eye_init = eye_data.iloc[0, :]
        eye_final = eye_data.iloc[-1, :]
    elif eye_drop=="fast":
        idx_min = eye_data.index.min()
        eye_init = eye_data.loc[eye_index-1 if eye_index-1>idx_min else idx_min, :]
        eye_final = eye_data.loc[eye_index, :]
    ball_init = ball_data[ball_data["round"]==ball_round].iloc[0, :]
    ball_final = ball_data[ball_data["round"]==ball_round].iloc[-1, :]

    init_bias = np.sqrt((ball_init["Ball.x"] - eye_init["Screen.x"])**2 + (ball_init["Ball.y"] - eye_init["Screen.y"])**2)
    final_bias = np.sqrt((ball_final["Ball.x"] - eye_final["Screen.x"])**2 + (ball_final["Ball.y"] - eye_final["Screen.y"])**2)
    return init_bias, final_bias


def compute_drop_point_bias_delay(eye_data:pd.DataFrame, eye_index, ball_data:pd.DataFrame, ball_round, eye_drop="last"):
    if eye_drop=="last":
        eye_init = eye_data.iloc[0, :]
        eye_final = eye_data.iloc[-1, :]
    elif eye_drop=="fast":
        idx_min = eye_data.index.min()
        eye_init = eye_data.loc[eye_index-1 if eye_index-1>idx_min else idx_min, :]
        eye_final = eye_data.loc[eye_index, :]
    ball_init = ball_data[ball_data["round"]==ball_round-1].iloc[0, :]
    ball_final = ball_data[ball_data["round"]==ball_round-1].iloc[-1, :]

    init_bias = np.sqrt((ball_init["Ball.x"] - eye_init["Screen.x"])**2 + (ball_init["Ball.y"] - eye_init["Screen.y"])**2)
    final_bias = np.sqrt((ball_final["Ball.x"] - eye_final["Screen.x"])**2 + (ball_final["Ball.y"] - eye_final["Screen.y"])**2)
    return init_bias, final_bias



## Trajectory Features

In [12]:
from tslearn.metrics import dtw as ts_dtw
from scipy.spatial.distance import cdist


## eps measured as pixel
def compute_traj_overlap(concat_df:pd.DataFrame, eps:float=15.0):
    concat_df["overlap"] = concat_df.apply(
            lambda row : 1 if np.sqrt((row["Screen.x"] - row["Ball.x"])**2 + (row["Screen.y"] - row["Ball.y"])**2) <= eps else 0,
            axis=1
        )
    return concat_df["overlap"].sum() / concat_df.shape[0]


def compute_traj_range_overlap(concat_df:pd.DataFrame, eps:float=15.0, time_range=25):
    concat_df["overlap"] = 0
    for idx, row in concat_df.iterrows():
        _temp_df = concat_df.loc[idx-time_range:idx+time_range, :]
        _temp_df["overlap"] = _temp_df.apply(
            lambda b_row : 1 if np.sqrt((row["Screen.x"] - b_row["Ball.x"])**2 + (row["Screen.y"] - b_row["Ball.y"])**2) <= eps else 0,
            axis=1
        )
        if _temp_df["overlap"].sum() > 0:
            concat_df.loc[idx, "overlap"] = 1
    return concat_df["overlap"].sum() / concat_df.shape[0]


def compute_dtw(line_1:pd.DataFrame, line_2:pd.DataFrame, scale_to_percentage=False, scale_metrics="mean"):
    assert line_1.shape[-1] == line_2.shape[-1]

    res = ts_dtw(line_1, line_2)

    if scale_to_percentage:
        mat = cdist(line_1, line_2)
        if scale_metrics=="max":
            res = (res / mat.max()) *100
        elif scale_metrics=="start":
            scale = np.sqrt((line_1.iloc[0, :]["Screen.x"] - line_2.iloc[0, :]["Ball.x"])**2 + (line_1.iloc[0, :]["Screen.y"] - line_2.iloc[0, :]["Ball.y"])**2)
            res = (res / scale) * 100
        elif scale_metrics=="end":
            scale = np.sqrt((line_1.iloc[-1, :]["Screen.x"] - line_2.iloc[-1, :]["Ball.x"])**2 + (line_1.iloc[-1, :]["Screen.y"] - line_2.iloc[-1, :]["Ball.y"])**2)
            res = (res / scale) * 100
        else:
            res = (res / mat.mean()) *100 

    return res


def compute_gradient_dtw(line_1:pd.DataFrame, line_2:pd.DataFrame, order:int=1, time_scale=1, scale_to_percentage=False):
    assert line_1.shape == line_2.shape
    assert "time" in line_1.columns or "frame" in line_1.columns, "Please include index, time, or frame (named 'time') in the line_1 DataFrame"
    assert "time" in line_2.columns or "frame" in line_2.columns, "Please include index, time, or frame (named 'time') in the line_2 DataFrame"
    assert order>=1, "Gradient order must larger or equal to 1"
    
    ## convert frame to time
    if "frame" in line_1.columns:
        line_1["time"] = line_1["frame"] * EYE_SAMPLE_TIME * time_scale
    if "frame" in line_2.columns:
        line_2["time"] = line_2["frame"] * EYE_SAMPLE_TIME * time_scale

    ## create gradient column
    line_1_grad_col, line_2_grad_col = [], []
    for col in line_1.columns:
        if col == "time": continue
        line_1[f"{col}.grad"] = line_1[col]
        line_1_grad_col.append(f"{col}.grad")
    for col in line_2.columns:
        if col == "time": continue
        line_2[f"{col}.grad"] = line_2[col]
        line_2_grad_col.append(f"{col}.grad")
    
    ## compute the gradient
    for _ in range(0, order):
        for col in line_1_grad_col:
            line_1[col] = np.gradient(line_1[col], line_1["time"])
        for col in line_2_grad_col:
            line_2[col] = np.gradient(line_2[col], line_2["time"])

    return compute_dtw(line_1[line_1_grad_col], line_2[line_2_grad_col], scale_to_percentage=scale_to_percentage)


In [13]:
def interplate_and_align(base_df:pd.DataFrame, align_df:pd.DataFrame, base_rate:int, align_rate:int, convert_dist=False):
    share_rate = abs(base_rate * align_rate) // np.gcd(base_rate, align_rate)
    
    align_df["frame"] = align_df["frame"] * (share_rate // align_rate)
    temp_index= np.array(range(1, int(base_df["frame"].max()+1) * (share_rate // base_rate)))

    temp_time_df = pd.DataFrame(temp_index, index=temp_index, columns=["frame"])
    _index = temp_time_df.index
    temp_time_df = temp_time_df.merge(align_df, how="left", on="frame")
    temp_time_df.index = _index
    ## interplate
    for col in temp_time_df.columns:
        if col=="round": continue
        temp_time_df[col] = temp_time_df[col].interpolate(method='linear')
    ## ensure first row is not empty
    temp_time_df.ffill(inplace=True)
    temp_time_df.bfill(inplace=True)
    
    base_df["frame"] = base_df["frame"] * (share_rate // base_rate)
    _index = base_df.index
    alined_df = base_df.merge(temp_time_df, how="left", on="frame", )
    alined_df.index = _index
    alined_df["frame"] = alined_df["frame"] // (share_rate // base_rate)

    if convert_dist:
        alined_df = alined_df * VR_SCALE
        alined_df["frame"] = alined_df["frame"] // VR_SCALE

    alined_df.ffill(inplace=True)
    alined_df.bfill(inplace=True)

    return alined_df


def extract_trajectory(eye_data_df:pd.DataFrame, video_id, scale_to_percentage=False):
    res = {}
    if not BALL_TRAJ: 
        return res

    eye_data_df["frame"] = eye_data_df.index
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id])
    
    aligned_df = interplate_and_align(eye_data_df, ball_data_df, EYE_SAMPLE_RATE, VIDEO_FPS, convert_dist=False)
    # aligned_df.rename(columns={'x':'Ball.x', "y":"Ball.y"}, inplace=True)
    
    _traj_overlap = compute_traj_overlap(aligned_df)
    _traj_range_overlap = compute_traj_range_overlap(aligned_df)
    _all_dtw = compute_dtw(
            line_1=aligned_df.loc[:, ["Screen.x", "Screen.y"]],
            line_2=aligned_df.loc[:, ["Ball.x", "Ball.y"]],
            scale_to_percentage=scale_to_percentage
        )
    _all_speed_dtw = compute_gradient_dtw(
            line_1=aligned_df.loc[:, ["frame", "Screen.x", "Screen.y"]],
            line_2=aligned_df.loc[:, ["frame", "Ball.x", "Ball.y"]],
            order=1,
            scale_to_percentage=scale_to_percentage
    )
    _all_accelerate_dtw = compute_gradient_dtw(
            line_1=aligned_df.loc[:, ["frame", "Screen.x", "Screen.y"]],
            line_2=aligned_df.loc[:, ["frame", "Ball.x", "Ball.y"]],
            order=2,
            scale_to_percentage=scale_to_percentage
    )

    res["AllTrajectoryOverlap"] = _traj_overlap * 100
    res["AllTrajectoryRangeOverlap"] = _traj_range_overlap * 100
    if scale_to_percentage:
        res["AllTrajectoryDTW"] = _all_dtw
        res["AllTrajectoryDTWPerSec"] = _all_dtw / (eye_data_df.shape[0] * EYE_SAMPLE_TIME / 1000)
        res["AllTrajectorySpeedDTW"] = _all_speed_dtw
        res["AllTrajectorySpeedDTWPerSec"] = _all_speed_dtw / (eye_data_df.shape[0] * EYE_SAMPLE_TIME / 1000)
        res["AllTrajectoryAccelerateDTW"] = _all_accelerate_dtw
        res["AllTrajectoryAcceleratePerSec"] = _all_accelerate_dtw / (eye_data_df.shape[0] * EYE_SAMPLE_TIME / 1000)
    else:
        res["AllTrajectoryDTW"] = _all_dtw * VR_SCALE
        res["AllTrajectoryDTWPerSec"] = _all_dtw * VR_SCALE / (eye_data_df.shape[0] * EYE_SAMPLE_TIME / 1000)
        res["AllTrajectorySpeedDTW"] = _all_speed_dtw * VR_SCALE
        res["AllTrajectorySpeedDTWPerSec"] = _all_speed_dtw * VR_SCALE / (eye_data_df.shape[0] * EYE_SAMPLE_TIME / 1000)
        res["AllTrajectoryAccelerateDTW"] = _all_accelerate_dtw * VR_SCALE
        res["AllTrajectoryAccelerateDTWPerSec"] = _all_accelerate_dtw * VR_SCALE / (eye_data_df.shape[0] * EYE_SAMPLE_TIME / 1000)
    
    return res


In [14]:

def extract_saccade_features_lite(round_index, data:pd.DataFrame, ball_data_df):
    _fea = {}

    _speeds, _angles = compute_saccade_path_lite(data)
    _fea = {**_fea, **compute_stat("SaccadeSpeed", _speeds)}
    _fea = {**_fea, **compute_stat("SaccadeAngel", _angles)}

    round_start_index = ball_data_df[ball_data_df["round"]==round_index].index[0]
    eye_start_index = data.index[0]
    _fea["SaccadeDelay"] = (eye_start_index-round_index) * EYE_SAMPLE_TIME

    # _fea["SaccadeTimeShift"] = (np.argmax(_speeds) - (data.shape[0] // 2)) * EYE_SAMPLE_TIME

    # bias_res = compute_drop_point_bias_pred(
    #     eye_data=data,
    #     eye_index=data.index[np.argmax(_speeds)],
    #     ball_data=ball_data_df,
    #     ball_round=round_index
    # )
    # _fea["SaccadeDropBiasPred_Init"] = bias_res[0] * VR_SCALE
    # _fea["SaccadeDropBiasPred_Final"] = bias_res[1] * VR_SCALE

    # bias_res = compute_drop_point_bias_delay(
    #     eye_data=data,
    #     eye_index=data.index[np.argmax(_speeds)],
    #     ball_data=ball_data_df,
    #     ball_round=round_index
    # )
    # _fea["SaccadeDropBiasDelay_Init"] = bias_res[0] * VR_SCALE
    # _fea["SaccadeDropBiasDelay_Final"] = bias_res[1] * VR_SCALE

    return _fea

In [15]:
def extract_trajectory_lite(round_index, data:pd.DataFrame, ball_data_df):
    _fea = {}
    
    _ball_df = ball_data_df[ball_data_df["round"]==round_index]
    _dtw = compute_dtw(
            line_1=data.loc[:, ["Screen.x", "Screen.y"]],
            line_2=_ball_df.loc[:, ["Ball.x", "Ball.y"]],
            # scale_to_percentage=True,
            # scale_metrics="start"
        )
    
    _fea["TrajectoryDTW"] = _dtw

    return _fea

## Find Pattern Match Round

In [16]:
from scipy.spatial import KDTree


def count_points_in_radius(ball_df:pd.DataFrame, eye_df:pd.DataFrame, radius):
    # Build a k-d tree from the points DataFrame
    points_tree = KDTree(eye_df[['Screen.x', 'Screen.y']])
    
    count_list = []
    
    for i, trajectory_point in ball_df.iterrows():
        # Query the k-d tree for points within the radius
        indices = points_tree.query_ball_point([trajectory_point['Ball.x'], trajectory_point['Ball.y']], r=radius)
        count_list += indices
    
    return list(set(count_list))

def find_match_round(eye_data_df:pd.DataFrame, video_id, radius=15, inner_percent=0.9):
    res = []
    if not BALL_TRAJ: 
        return res

    eye_data_df["frame"] = eye_data_df.index
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id])
    assert "round" in ball_data_df.columns, "Missing 'round' in ball trajectory file"

    aligned_df = interplate_and_align(eye_data_df, ball_data_df, EYE_SAMPLE_RATE, VIDEO_FPS, convert_dist=False)

    for _round in aligned_df["round"].value_counts().index:
        temp = aligned_df[aligned_df["round"]==_round]
        inner_counts = count_points_in_radius(temp.loc[:, ["Ball.x", "Ball.y"]], temp.loc[:, ["Screen.x", "Screen.y"]], radius=radius)

        # check_df = temp.iloc[inner_counts, :]
        # max_x, min_x = check_df["Screen.x"].max(), check_df["Screen.x"].min()
        # max_y, min_y = check_df["Screen.y"].max(), check_df["Screen.y"].min()
        # dis = np.sqrt((max_x - min_x)**2 + (max_y - min_y)**2)
        dis = radius

        if len(inner_counts) / temp.shape[0] > inner_percent and dis >= radius:
            res.append(_round)

    return res


def find_match_round_hit(eye_data_df:pd.DataFrame, video_id, time_range=10, dist=250):
    res ={}
    if not BALL_TRAJ: 
        return res

    eye_data_df["frame"] = eye_data_df.index
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id])
    assert "round" in ball_data_df.columns, "Missing 'round' in ball trajectory file"

    aligned_df = interplate_and_align(eye_data_df, ball_data_df, EYE_SAMPLE_RATE, VIDEO_FPS, convert_dist=False)

    aligned_df["hit"] = aligned_df["round"] - aligned_df["round"].shift(1, fill_value=1)
    hits_indices = aligned_df[aligned_df["hit"]==1].index

    for idx, _hit in enumerate(hits_indices):
        index_range = [_hit-time_range, _hit+time_range]
        temp = aligned_df.loc[index_range[0]:index_range[1], :]
        eye_max_dist = np.sqrt((temp["Screen.x"].max() - temp["Screen.x"].min())**2 + (temp["Screen.y"].max() - temp["Screen.y"].min())**2)

        if eye_max_dist >= dist:
            res[aligned_df.loc[_hit, "round"]] = list(temp.index)

    return res

In [17]:
from sklearn.preprocessing import minmax_scale


def given_minmax_scale(data:pd.DataFrame, minmax, feature_range=[0,1], axis=0):
    res = []
    for _i in data.index if axis==1 else data.columns:
        scale = (feature_range[1] - feature_range[0]) / (minmax[1] - minmax[0])
        X_scaled = scale * (data[_i] - minmax[0])
        res.append(X_scaled)

    return pd.DataFrame(res) if axis==1 else pd.DataFrame(res).T


def interplate_enlarge(ori_df:pd.DataFrame, size:int):
    ori_df = ori_df.reset_index(drop=True)
    ori_df.index = [int(np.round(i * (size / ori_df.shape[0]))) for i in ori_df.index ]
    new_index = pd.Index(range(0, min(size, ori_df.index[-1]+1)))
    # print(new_index)
    # print(size / ori_df.shape[0])
    ori_df = ori_df.reindex(new_index).interpolate()
    return ori_df


def find_match_round_dtw(eye_data_df:pd.DataFrame, video_id, order=0, scale_raw_data=False):
    res = {}
    dtw_res = {}
    if not BALL_TRAJ: 
        return res

    eye_data_df["frame"] = eye_data_df.index
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id])
    assert "round" in ball_data_df.columns, "Missing 'round' in ball trajectory file"

    aligned_df = interplate_and_align(eye_data_df, ball_data_df, EYE_SAMPLE_RATE, VIDEO_FPS, convert_dist=False)
    if scale_raw_data:
        aligned_df.loc[:, ["Screen.x", "Ball.x", "Screen.y", "Ball.y"]] = minmax_scale(aligned_df.loc[:, ["Screen.x", "Ball.x", "Screen.y", "Ball.y"]], axis=0)

    for _round in aligned_df["round"].value_counts(sort=False).index:
        temp = aligned_df[aligned_df["round"]==_round]

        match_indice = None
        min_dtw = np.inf
        for idx in temp.index:
            for window_size in range(MIN_DTW_WINDOW, temp.shape[0]):
                # temp_window = aligned_df.loc[_shift_idx:_shift_idx+window_size, :]
                temp_window = aligned_df.loc[ idx : idx + window_size, :]
                if temp_window.shape[0] < MIN_DTW_WINDOW: continue
                _indices = temp_window.index

                if temp_window.shape[0] < temp.shape[0]:
                    temp_window = interplate_enlarge(temp_window, size=temp.shape[0])

                if order==0:
                    _dtw = compute_dtw(
                        line_1 = temp.loc[:, ["Ball.x", "Ball.y"]],
                        line_2 = temp_window.loc[:, ["Screen.x", "Screen.y"]],
                        scale_to_percentage=False
                    )
                elif order>0:
                    _dtw = compute_gradient_dtw(
                            line_1=temp.loc[:, ["frame", "Ball.x", "Ball.y"]],
                            line_2=temp_window.loc[:, ["frame", "Screen.x", "Screen.y"]],
                            order=order,
                            scale_to_percentage=False
                    )
                else:
                    raise Exception("Order of gradient must be positive")
                
                if _dtw < min_dtw:
                    min_dtw = _dtw
                    match_indice = _indices
              
        dtw_res[_round] = min_dtw
        res[_round] = match_indice

    return res, dtw_res

In [18]:
def find_match_round_dtw_kmp(eye_data_df:pd.DataFrame, video_id, order=0, scale_raw_data=False):
    res = {}
    dtw_res = {}
    # dtw_value = []
    if not BALL_TRAJ: 
        return res

    eye_data_df["frame"] = eye_data_df.index
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id])
    assert "round" in ball_data_df.columns, "Missing 'round' in ball trajectory file"

    aligned_df = interplate_and_align(eye_data_df, ball_data_df, EYE_SAMPLE_RATE, VIDEO_FPS, convert_dist=False)
    if scale_raw_data:
        aligned_df.loc[:, ["Screen.x", "Ball.x", "Screen.y", "Ball.y"]] = minmax_scale(aligned_df.loc[:, ["Screen.x", "Ball.x", "Screen.y", "Ball.y"]], axis=0)

    for _round in aligned_df["round"].value_counts(sort=False).index:
        temp = aligned_df[aligned_df["round"]==_round]

        match_indice = None
        min_dtw = np.inf
        _shift_idx = temp.index[0]
        _last_idx = temp.index[0] + MIN_DTW_WINDOW
        # for window_size in range(MIN_DTW_WINDOW, temp.shape[0]):
        max_idx = list(aligned_df[(aligned_df["round"]==_round+1) | (aligned_df["round"]==_round)].index)[-1]

        while _last_idx < max_idx:
            # temp_window = aligned_df.loc[_shift_idx:_shift_idx+window_size, :]
            temp_window = aligned_df.loc[ _shift_idx : _last_idx, :]
            _indices = temp_window.index

            if temp_window.shape[0] < temp.shape[0]:
                temp_window = interplate_enlarge(temp_window, size=temp.shape[0])

            if temp_window.shape[0] < MIN_DTW_WINDOW: continue

            if order==0:
                _dtw = compute_dtw(
                    line_1 = temp.loc[:, ["Ball.x", "Ball.y"]],
                    line_2 = temp_window.loc[:, ["Screen.x", "Screen.y"]],
                    scale_to_percentage=False
                )
            elif order>0:
                _dtw = compute_gradient_dtw(
                        line_1=temp.loc[:, ["frame", "Ball.x", "Ball.y"]],
                        line_2=temp_window.loc[:, ["frame", "Screen.x", "Screen.y"]],
                        order=order,
                        scale_to_percentage=False
                )
            else:
                raise Exception("Order of gradient must be positive")
            
            _last_idx += 1
            if _dtw < min_dtw:
                min_dtw = _dtw
                match_indice = _indices
            else:
                _shift_idx = _last_idx
                _last_idx = _last_idx + MIN_DTW_WINDOW

        dtw_res[_round] = min_dtw
        res[_round] = match_indice

    return res, dtw_res

### Check Proper Hyper-Parameter for Test Round

In [19]:
def test_match_hit(data, video_id):
    data_df = pd.DataFrame(data).T
    data_df.ffill(inplace=True)
    data_df.bfill(inplace=True)
    
    if "_" in video_id:
        video_id = video_id.split("_")[0]

    res = {}
    for r in [5, 7, 10, 15]:
        for d in [100,200,300,350,400]:
            count_res = find_match_round_hit(data_df.loc[:, ["Screen.x", "Screen.y"]], video_id, time_range=r, dist=d)
            res[f"Time Range {r}-Dist {d}"] = len(count_res)
            # res[f"Radius {r} match round"] = count_res
    return res


In [20]:
def test_match_round(data, video_id):
    data_df = pd.DataFrame(data).T
    data_df.ffill(inplace=True)
    data_df.bfill(inplace=True)
    
    if "_" in video_id:
        video_id = video_id.split("_")[0]

    res = {}
    for r in [25, 50, 75, 100]:
        count_res = find_match_round(data_df.loc[:, ["Screen.x", "Screen.y"]], video_id, radius=r)
        res[f"Cover Radius {r}"] = len(count_res)
    return res

In [21]:
def test_match(method=""):
    all_people_match = {}
    for _person in all_data.keys():
        _person_fea = {}

        for _video in all_data[_person].keys():
            if method=="hit":
                _person_fea[_video] = test_match_hit(all_data[_person][_video], _video)
            elif method=="round":
                _person_fea[_video] = test_match_round(all_data[_person][_video], _video)
            else:
                raise Exception("methods not support")

        all_people_match[_person] = _person_fea
    
    return all_people_match

In [22]:
# import json

# if os.path.exists("match_round_res.json"):
#     with open("match_round_res.json", "r") as f:
#         match_round_res = json.load(f)
# else:
#     match_round_res = test_match("hit")
#     with open("match_round_res.json", "w") as f:
#         json.dump(match_round_res, f)

In [23]:
def measure_hit(match_round_res, measure="mean"):
    stat = None
    for idx, _f in enumerate(file_list):
        _p = _f.split(".")[0]
        if measure=="median":
            _p_stat = pd.DataFrame(match_round_res[_p]).T.median()
        elif measure=="min":
            _p_stat = pd.DataFrame(match_round_res[_p]).T.min()
        elif measure=="max":
            _p_stat = pd.DataFrame(match_round_res[_p]).T.max()
        else:
            _p_stat = pd.DataFrame(match_round_res[_p]).T.mean()

        if idx==0:
            stat = _p_stat
        else:
            stat.join(_p_stat)

        return stat

In [24]:
print("Pixel to Angle: 100->", np.arctan(100 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: 200->", np.arctan(200 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: 300->", np.arctan(300 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: 350->", np.arctan(350 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: 400->", np.arctan(400 * VR_SCALE / VR_ZDIST) / np.pi * 180)

Pixel to Angle: 100-> 6.886893402895522
Pixel to Angle: 200-> 13.58034668174109
Pixel to Angle: 300-> 19.91765997635866
Pixel to Angle: 350-> 22.915443016760896
Pixel to Angle: 400-> 25.786340252252728


In [25]:
print("Pixel to Angle: 360->", np.arctan(360 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: -360->", np.arctan(-360 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: 640->", np.arctan(640 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: -640->", np.arctan(-640 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: 734->", np.arctan(734 * VR_SCALE / VR_ZDIST) / np.pi * 180)
print("Pixel to Angle: -734->", np.arctan(-734 * VR_SCALE / VR_ZDIST) / np.pi * 180)

Pixel to Angle: 360-> 23.499997351668693
Pixel to Angle: -360-> -23.499997351668693
Pixel to Angle: 640-> 37.704011712943334
Pixel to Angle: -640-> -37.704011712943334
Pixel to Angle: 734-> 41.558079394134964
Pixel to Angle: -734-> -41.558079394134964


## Main extrat features function

In [26]:
def extract_features_round(rounds:dict, data_df:pd.DataFrame, video_id:str):
    res = {}
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id])
    for round_index, round_indices in rounds.items():
        round_res = extract_saccade_features_lite(round_index, data_df.loc[round_indices, ["Screen.x", "Screen.y"]], ball_data_df)
        # res[round_index] = round_res
        traj_res = extract_trajectory_lite(round_index, data_df.loc[round_indices, ["Screen.x", "Screen.y"]], ball_data_df)
        res[round_index] = {**round_res, **traj_res}
    return res

In [27]:
with open("yes.txt", "r") as f:
    temp = f.readlines()
LABEL_ROUNDS = {}
for _str in temp:
    _str = _str.replace("\n", "")
    _video_id = _str.split(":")[0]
    if len(_str.split(":")[1])==0: continue
    _rounds = [float(j) for j in _str.split(":")[1].split(";")]
    LABEL_ROUNDS[_video_id] = _rounds


def label_round_hit(eye_data_df:pd.DataFrame, video_id,):
    res ={}
    if not BALL_TRAJ: 
        return res

    eye_data_df["frame"] = eye_data_df.index
    ball_data_df = pd.DataFrame(BALL_TRAJ[video_id.split("_")[0]])
    assert "round" in ball_data_df.columns, "Missing 'round' in ball trajectory file"

    aligned_df = interplate_and_align(eye_data_df, ball_data_df, EYE_SAMPLE_RATE, VIDEO_FPS, convert_dist=False)
    try:
        _rounds = LABEL_ROUNDS[video_id]
    except:
        return {}
    
    for _r in _rounds:
        if video_id.startswith("p"):
            res[_r] = aligned_df[aligned_df["round"]==_r+1].index.to_list()
            if len(res[_r])==0:
                res[_r] = aligned_df[aligned_df["round"]==_r].index.to_list()
        else:
            res[_r] = aligned_df[aligned_df["round"]==_r].index.to_list()

    return res


In [28]:
def threshold_find_match_round_dtw(eye_data:pd.DataFrame, video_id, order, scale_raw_data, mode="fast", dtw_th=1, dist_th=10):
    if mode=="fast":
        rounds, dtw_res = find_match_round_dtw_kmp(eye_data_df=eye_data, video_id=video_id, order=order, scale_raw_data=scale_raw_data)
    elif mode=="greedy":
        rounds, dtw_res = find_match_round_dtw(eye_data_df=eye_data, video_id=video_id, order=order, scale_raw_data=scale_raw_data)

    res = {}
    for _round, _round_index in rounds.items():
        if (max_circle_radius(eye_data.loc[_round_index, :]) >= dist_th) and (dtw_res[_round] <= dtw_th):
            res[_round] = _round_index

    return res

In [29]:
def extract_features(data, video_id):
    data_df = pd.DataFrame(data).T
    data_df.ffill(inplace=True)
    data_df.bfill(inplace=True)
    
    # match_rounds = label_round_hit(data_df.loc[:, ["Screen.x", "Screen.y"]], video_id)


    if "_" in video_id:
        test_video = video_id.split("_")[0]
    else:
        test_video = video_id

    match_rounds = threshold_find_match_round_dtw(data_df, test_video, order=0, scale_raw_data=True, mode="fast")
    # match_rounds = find_match_round_hit(data_df.loc[:, ["Screen.x", "Screen.y"]], video_id, time_range=7, dist=300)
    saccade_features = extract_features_round(match_rounds, data_df.loc[:, ["Screen.x", "Screen.y"]], test_video)
    
    return saccade_features

    # saccade_features = extract_saccade_features(match_rounds, data_df.loc[:, ["Screen.x", "Screen.y"]], video_id)
    # trajectory_features = extract_trajectory(data_df.loc[:, ["Screen.x", "Screen.y"]], video_id, scale_to_percentage=True)

    # return {**saccade_features, **trajectory_features}

## Extract Features of all Participants

In [30]:
all_people_fea = {}
video_list= []
for _person in all_data.keys():
    if not _person == "24071512_AD": continue
    _person_fea = {}

    for _video in all_data[_person].keys():
        # if not _video == "p7": continue
        if not _video in video_list: video_list.append(_video)
        
        fea_res = extract_features(data=all_data[_person][_video], video_id=_video)
        if fea_res:
            _person_fea[_video] = fea_res

    all_people_fea[_person] = _person_fea

In [31]:
def single_video_res(all_people_fea, video_id):
    res = {}
    for _p, person_fea in all_people_fea.items():
        try:
            for _r, _fea in person_fea[video_id].items():
                res[f"{_p}-{_r}"] = _fea
        except:
            continue
    return res


In [32]:
def single_person_res(all_people_fea, people_id):
    res = {}
    for _v, video_fea in all_people_fea[people_id].items():
        try:
            for _r, _fea in video_fea.items():
                res[f"{_v}-{_r}"] = _fea
        except:
            continue
    return res

In [35]:
person_fea = single_person_res(all_people_fea=all_people_fea, people_id="24071512_AD")
pd.DataFrame(person_fea).T.to_csv("24071512_AD_dtw_fast_rounds.csv")

In [34]:
video_id = video_list[4]
print(video_id)
video_res = single_video_res(all_people_fea=all_people_fea, video_id=video_id)
pd.DataFrame(video_res).T

p7


Unnamed: 0,SaccadeSpeed_Mean,SaccadeSpeed_Max,SaccadeSpeed_Min,SaccadeSpeed_Std,SaccadeAngel_Mean,SaccadeAngel_Max,SaccadeAngel_Min,SaccadeAngel_Std,SaccadeDelay,TrajectoryDTW
24071512_AD-3.0,82.640553,413.202767,0.0,165.281107,1.652811,8.264055,0.0,3.305622,1820.0,300.16288
24071512_AD-8.0,304.8838,654.553153,0.0,269.090155,6.097676,13.091063,0.0,5.381803,3620.0,268.727935
24071512_AD-10.0,162.601117,221.936778,0.0,93.971469,3.252022,4.438736,0.0,1.879429,5800.0,353.752494
24071512_AD-18.0,178.374241,526.025556,0.0,208.771884,3.567485,10.520511,0.0,4.175438,11000.0,252.465585
24071512_AD-21.0,198.640895,385.736076,0.0,162.337505,3.972818,7.714722,0.0,3.24675,13100.0,197.764201
