In [9]:
import pandas as pd 
import numpy as np
from tqdm import tqdm
from scipy.fft import fft, dct
import scipy.stats
from pathlib import Path
import os
import sys

In [None]:
# Constants
ref_x = [885, 885, 640, 738, 486, 235, 332, 87, 389, 486, 583, 886, 886]
ref_y = [394, 436, 628, 935, 741, 935, 628, 436, 436, 141, 436, 436, 394]
x_threshold = [650, 280, 210, 520]
y_threshold = [850, 850, 460, 250]

def slice_df_position(df, x_threshold, y_threshold):
    """Slice dataframe based on position thresholds"""
    df_array = []
    pix = 0
    remainder_df = []

    for i in range(len(x_threshold)):
        remainder_df = df.iloc[pix:].copy()
        if i == 0:
            ix = remainder_df.loc[(remainder_df['X'] < x_threshold[i]) & (remainder_df['Y'] > y_threshold[i])].index[0]
        elif i == 1:
            ix = remainder_df.loc[(remainder_df['X'] < x_threshold[i]) & (remainder_df['Y'] < y_threshold[i])].index[0]
        elif i == 2:
            ix = remainder_df.loc[(remainder_df['X'] > x_threshold[i]) & (remainder_df['Y'] < y_threshold[i])].index[0]
        elif i == 3:
            ix = remainder_df.loc[(remainder_df['X'] > x_threshold[i])].index[0]
            
        df_slice = df.iloc[pix:ix].copy()
        df_slice.reset_index(drop=True, inplace=True)
        df_array.append(df_slice)
        pix = ix + 1
    df_slice = df.iloc[ix:].copy()
    df_slice.reset_index(drop=True, inplace=True)
    df_array.append(df_slice)
    
    return df_array

def load_and_prepare_reference():
    """Load and prepare reference trajectory"""
    # Reference points for the star pentagon
    ref_x = [885, 885, 640, 738, 486, 235, 332, 87, 389, 486, 583, 886, 886]
    ref_y = [394, 436, 628, 935, 741, 935, 628, 436, 436, 141, 436, 436, 394]
    
    # Interpolation parameters
    num_points = 50
    threshold_distance = 58
    
    # Interpolate points
    interpolated_points = []
    events = []
    
    for i in range(1, len(ref_x)):
        x_values = np.linspace(ref_x[i - 1], ref_x[i], num_points)
        y_values = np.linspace(ref_y[i - 1], ref_y[i], num_points)
        interpolated_points.extend(list(zip(x_values, y_values)))
        
        # Event assignment logic
        if i == 1 or i == len(ref_x) - 1:
            events.extend(np.zeros(num_points))
        elif i % 2 == 1:
            events.extend([1 if np.sqrt((x - ref_x[i])**2 + (y - ref_y[i])**2) <= threshold_distance else 0 
                         for x, y in zip(x_values, y_values)])
        else:
            events.extend([1 if np.sqrt((x - ref_x[i-1])**2 + (y - ref_y[i-1])**2) <= threshold_distance else 0 
                         for x, y in zip(x_values, y_values)])
    
    df_ref = pd.DataFrame(interpolated_points, columns=['X', 'Y'])
    df_ref['Event'] = events
    return df_ref

# Original feature functions
def calculate_tracking_error(subtrial_dict, df_ref_section, penalty_weight=2):
    """Calculate tracking error for a single subtrial"""
    # Convert subtrial dictionary to dataframe
    df_trial = pd.DataFrame({
        'X': subtrial_dict['X'],
        'Y': subtrial_dict['Y'],
        'Event': subtrial_dict['Event']
    })
    
    # Calculate center-based distances
    center_x, center_y = 510, 512
    df_trial['d_cen'] = np.sqrt((df_trial['X'] - center_x)**2 + (df_trial['Y'] - center_y)**2)
    df_ref_section['d_cen'] = np.sqrt((df_ref_section['X'] - center_x)**2 + 
                                     (df_ref_section['Y'] - center_y)**2)
    
    # Calculate signed errors
    signed_errors = []
    for _, row in df_trial.iterrows():
        distances = np.sqrt((df_ref_section['X'] - row['X'])**2 + 
                          (df_ref_section['Y'] - row['Y'])**2)
        min_idx = np.argmin(distances)
        min_dist = distances[min_idx]
        
        # Sign determination: negative if reference point is further from center
        sign = -1 if df_ref_section['d_cen'].iloc[min_idx] > row['d_cen'] else 1
        signed_errors.append(sign * min_dist)
    
    # Calculate error index with penalties
    error_sum_sq = sum([(error**2 * penalty_weight if error < 0 else error**2) 
                       for error in signed_errors])
    rmse = np.sqrt(error_sum_sq / len(signed_errors))
    
    # Apply saturation
    return min(rmse, 20)

def calculate_total_path_length_relative(subtrial_dict, df_ref_section):
    """Calculate relative path length between actual and reference trajectory"""
    # Convert subtrial dictionary to arrays for faster computation
    x_actual = np.array(list(subtrial_dict['X'].values()))
    y_actual = np.array(list(subtrial_dict['Y'].values()))
    
    # Calculate actual path length
    dx_actual = np.diff(x_actual)
    dy_actual = np.diff(y_actual)
    total_path_length = np.sum(np.sqrt(dx_actual**2 + dy_actual**2))
    
    # Calculate reference path length
    dx_ref = np.diff(df_ref_section['X'].values)
    dy_ref = np.diff(df_ref_section['Y'].values)
    total_path_length_ref = np.sum(np.sqrt(dx_ref**2 + dy_ref**2))
    
    return total_path_length / total_path_length_ref

def calculate_completion_time(subtrial_dict):
    """Calculate total time taken for subtrial"""
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    return timestamps[-1] - timestamps[0]

def calculate_speed_metrics(subtrial_dict):
    """Calculate average speed and speed variance"""
    # Convert to numpy arrays for faster computation
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    x = np.array((list(subtrial_dict['X'].values())))
    y = np.array((list(subtrial_dict['Y'].values())))
    
    # Calculate distances and time differences
    dx = np.diff(x)
    dy = np.diff(y)
    dt = np.diff(timestamps)
    
    # Handle zero time differences
    valid_idx = dt > 0
    if not np.any(valid_idx):
        return 0, 0  # Return zero if no valid measurements
        
    # Calculate speeds only for valid time differences
    speeds = np.zeros(len(dt))
    speeds[valid_idx] = np.sqrt(dx[valid_idx]**2 + dy[valid_idx]**2) / dt[valid_idx]
    
    # Remove any infinite or NaN values
    speeds = speeds[np.isfinite(speeds)]
    if len(speeds) == 0:
        return 0, 0
        
    return np.mean(speeds), np.var(speeds)

def calculate_jerk(subtrial_dict):
    """Calculate average jerk (rate of change of acceleration)"""
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    x = np.array((list(subtrial_dict['X'].values())))
    y = np.array((list(subtrial_dict['Y'].values())))
    
    if len(timestamps) < 4:  # Need at least 4 points for jerk
        return 0
    
    # First derivative (velocity)
    dt = np.diff(timestamps)
    valid_dt = dt > 0
    if not np.any(valid_dt):
        return 0
        
    vx = np.zeros(len(dt))
    vy = np.zeros(len(dt))
    vx[valid_dt] = np.diff(x)[valid_dt] / dt[valid_dt]
    vy[valid_dt] = np.diff(y)[valid_dt] / dt[valid_dt]
    
    # Second derivative (acceleration)
    dt2 = np.diff(timestamps[1:])
    valid_dt2 = dt2 > 0
    if not np.any(valid_dt2):
        return 0
        
    ax = np.zeros(len(dt2))
    ay = np.zeros(len(dt2))
    ax[valid_dt2] = np.diff(vx)[valid_dt2] / dt2[valid_dt2]
    ay[valid_dt2] = np.diff(vy)[valid_dt2] / dt2[valid_dt2]
    
    # Third derivative (jerk)
    dt3 = np.diff(timestamps[2:])
    valid_dt3 = dt3 > 0
    if not np.any(valid_dt3):
        return 0
        
    jx = np.zeros(len(dt3))
    jy = np.zeros(len(dt3))
    jx[valid_dt3] = np.diff(ax)[valid_dt3] / dt3[valid_dt3]
    jy[valid_dt3] = np.diff(ay)[valid_dt3] / dt3[valid_dt3]
    
    # Calculate jerk magnitude
    jerk_magnitude = np.sqrt(jx**2 + jy**2)
    jerk_magnitude = jerk_magnitude[np.isfinite(jerk_magnitude)]
    
    if len(jerk_magnitude) == 0:
        return 0
        
    return np.mean(jerk_magnitude)

# New feature functions
def calculate_angular_speed(subtrial_dict):
    """Calculate average angular speed (rate of change of movement direction)"""
    x = np.array(list(subtrial_dict['X'].values()))
    y = np.array(list(subtrial_dict['Y'].values()))
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    
    if len(timestamps) < 3:  # Need at least 3 points
        return 0, 0
    
    # Calculate velocity vectors
    dx = np.diff(x)
    dy = np.diff(y)
    dt = np.diff(timestamps)
    
    valid_idx = dt > 0
    if not np.any(valid_idx):
        return 0, 0
    
    # Calculate angles of velocity vectors
    angles = np.arctan2(dy[valid_idx], dx[valid_idx])
    
    # Calculate angular differences (handling wrap-around)
    angle_diff = np.diff(angles)
    # Normalize to [-pi, pi]
    angle_diff = (angle_diff + np.pi) % (2 * np.pi) - np.pi
    
    # Calculate angular speeds
    dt_angles = dt[valid_idx][1:]  # Time differences for angle changes
    valid_dt_angles = dt_angles > 0
    
    if not np.any(valid_dt_angles):
        return 0, 0
    
    angular_speeds = np.abs(angle_diff[valid_dt_angles] / dt_angles[valid_dt_angles])
    
    # Remove any infinite or NaN values
    angular_speeds = angular_speeds[np.isfinite(angular_speeds)]
    
    if len(angular_speeds) == 0:
        return 0, 0
    
    return np.mean(angular_speeds), np.var(angular_speeds)

def calculate_spectral_entropy(subtrial_dict):
    """Calculate spectral entropy of the speed signal"""
    # First get the speed signal
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    x = np.array(list(subtrial_dict['X'].values()))
    y = np.array(list(subtrial_dict['Y'].values()))
    
    if len(timestamps) < 4:
        return 0
    
    # Calculate speeds
    dx = np.diff(x)
    dy = np.diff(y)
    dt = np.diff(timestamps)
    
    valid_idx = dt > 0
    if not np.any(valid_idx):
        return 0
    
    speeds = np.zeros(len(dt))
    speeds[valid_idx] = np.sqrt(dx[valid_idx]**2 + dy[valid_idx]**2) / dt[valid_idx]
    speeds = speeds[np.isfinite(speeds)]
    
    if len(speeds) < 4:
        return 0
    
    # Apply FFT
    fft_values = np.abs(fft(speeds))
    # Use only positive frequencies
    fft_values = fft_values[:len(fft_values)//2]
    
    # Calculate power spectrum
    power_spectrum = fft_values**2
    
    # Normalize to get probability distribution
    if np.sum(power_spectrum) == 0:
        return 0
    
    p_k = power_spectrum / np.sum(power_spectrum)
    
    # Calculate entropy (avoiding log(0))
    p_k = p_k[p_k > 0]
    spectral_entropy = -np.sum(p_k * np.log2(p_k))
    
    return spectral_entropy

def calculate_fft_features(subtrial_dict):
    """Calculate FFT-based features from speed signal"""
    # Get speed signal
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    x = np.array(list(subtrial_dict['X'].values()))
    y = np.array(list(subtrial_dict['Y'].values()))
    
    if len(timestamps) < 4:
        return 0, 0, 0  # Return three zeros for three features
    
    # Calculate speeds
    dx = np.diff(x)
    dy = np.diff(y)
    dt = np.diff(timestamps)
    
    valid_idx = dt > 0
    if not np.any(valid_idx):
        return 0, 0, 0
    
    speeds = np.zeros(len(dt))
    speeds[valid_idx] = np.sqrt(dx[valid_idx]**2 + dy[valid_idx]**2) / dt[valid_idx]
    speeds = speeds[np.isfinite(speeds)]
    
    if len(speeds) < 4:
        return 0, 0, 0
    
    # Calculate sampling frequency
    mean_dt = np.mean(dt[valid_idx])
    fs = 1.0 / mean_dt if mean_dt > 0 else 1.0
    
    # Apply FFT
    fft_values = np.abs(fft(speeds))
    frequencies = np.fft.fftfreq(len(speeds), 1/fs)
    
    # Use only positive frequencies
    positive_idx = frequencies > 0
    fft_values = fft_values[positive_idx]
    frequencies = frequencies[positive_idx]
    
    if len(fft_values) == 0:
        return 0, 0, 0
    
    # Find dominant frequency
    dominant_idx = np.argmax(fft_values)
    dominant_frequency = frequencies[dominant_idx]
    
    # Calculate mean frequency (weighted by power)
    power = fft_values**2
    if np.sum(power) > 0:
        mean_frequency = np.sum(frequencies * power) / np.sum(power)
        # Variance of frequency
        frequency_variance = np.sum((frequencies - mean_frequency)**2 * power) / np.sum(power)
    else:
        mean_frequency = 0
        frequency_variance = 0
    
    return dominant_frequency, mean_frequency, frequency_variance

def calculate_dct_features(subtrial_dict):
    """Calculate DCT-based features from speed signal"""
    # Get speed signal
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    x = np.array(list(subtrial_dict['X'].values()))
    y = np.array(list(subtrial_dict['Y'].values()))
    
    if len(timestamps) < 4:
        return 0, 0  # Return two zeros for two features
    
    # Calculate speeds
    dx = np.diff(x)
    dy = np.diff(y)
    dt = np.diff(timestamps)
    
    valid_idx = dt > 0
    if not np.any(valid_idx):
        return 0, 0
    
    speeds = np.zeros(len(dt))
    speeds[valid_idx] = np.sqrt(dx[valid_idx]**2 + dy[valid_idx]**2) / dt[valid_idx]
    speeds = speeds[np.isfinite(speeds)]
    
    if len(speeds) < 4:
        return 0, 0
    
    # Apply DCT
    dct_coeffs = dct(speeds, type=2, norm='ortho')
    
    # Calculate energy (sum of squared coefficients)
    dct_energy = np.sum(dct_coeffs**2)
    
    # Calculate entropy of DCT coefficients
    # Normalize absolute values to get probability distribution
    abs_coeffs = np.abs(dct_coeffs)
    if np.sum(abs_coeffs) > 0:
        p_coeffs = abs_coeffs / np.sum(abs_coeffs)
        p_coeffs = p_coeffs[p_coeffs > 0]  # Remove zeros
        dct_entropy = -np.sum(p_coeffs * np.log2(p_coeffs))
    else:
        dct_entropy = 0
    
    return dct_energy, dct_entropy

def calculate_approximate_entropy(subtrial_dict, m=2, r=None):
    """Calculate Approximate Entropy (ApEn) of speed signal"""
    # Get speed signal
    timestamps = np.array(list(subtrial_dict['Timestamp'].values()))
    x = np.array(list(subtrial_dict['X'].values()))
    y = np.array(list(subtrial_dict['Y'].values()))
    
    if len(timestamps) < m + 2:
        return 0
    
    # Calculate speeds
    dx = np.diff(x)
    dy = np.diff(y)
    dt = np.diff(timestamps)
    
    valid_idx = dt > 0
    if not np.any(valid_idx):
        return 0
    
    speeds = np.zeros(len(dt))
    speeds[valid_idx] = np.sqrt(dx[valid_idx]**2 + dy[valid_idx]**2) / dt[valid_idx]
    speeds = speeds[np.isfinite(speeds)]
    
    N = len(speeds)
    if N < m + 2:
        return 0
    
    # Set tolerance if not provided
    if r is None:
        r = 0.2 * np.std(speeds)
        if r == 0:
            r = 0.1  # Default if std is 0
    
    def _maxdist(xi, xj, m):
        """Calculate maximum absolute difference between patterns"""
        return max([abs(float(a) - float(b)) for a, b in zip(xi, xj)])
    
    def _phi(m):
        """Calculate phi(m)"""
        patterns = [speeds[i:i+m] for i in range(N - m + 1)]
        C = []
        
        for i in range(len(patterns)):
            count = 0
            for j in range(len(patterns)):
                if _maxdist(patterns[i], patterns[j], m) <= r:
                    count += 1
            C.append(count / (N - m + 1))
        
        phi = sum([np.log(c) for c in C if c > 0]) / len(patterns)
        return phi
    
    try:
        return _phi(m) - _phi(m + 1)
    except:
        return 0

def main():
    """Main function to calculate all features"""
    current_dir = Path(os.getcwd())
    df_path = current_dir.parent / "data" / "processed" / "master_dataset.csv"
    df = pd.read_csv(df_path, encoding='unicode_escape')
    df_ref = load_and_prepare_reference()
    df_ref_slices = slice_df_position(df_ref, x_threshold, y_threshold)
    
    df['feature_vector'] = [{} for _ in range(len(df))]
    df.Subtrial = df.Subtrial.apply(eval)
    
    for idx, row in tqdm(df.iterrows(), total=df.shape[0]):
        subtrial_num = int(row['id'].split('-')[2][1]) - 1
        
        features = {}
        
        # Original features
        features['error'] = calculate_tracking_error(row['Subtrial'], df_ref_slices[subtrial_num])
        features['total_path_length_relative'] = calculate_total_path_length_relative(row['Subtrial'], df_ref_slices[subtrial_num])
        features['completion_time'] = calculate_completion_time(row['Subtrial'])
        features['speed_metric_avg'], features['speed_metric_var'] = calculate_speed_metrics(row['Subtrial'])
        features['jerk'] = calculate_jerk(row['Subtrial'])
        
        # New features
        features['angular_speed_avg'], features['angular_speed_var'] = calculate_angular_speed(row['Subtrial'])
        features['spectral_entropy'] = calculate_spectral_entropy(row['Subtrial'])
        features['approximate_entropy'] = calculate_approximate_entropy(row['Subtrial'])
        
        # FFT features
        fft_results = calculate_fft_features(row['Subtrial'])
        features['dominant_frequency'] = fft_results[0]
        features['mean_frequency'] = fft_results[1] 
        features['frequency_variance'] = fft_results[2]
        
        # DCT features
        dct_results = calculate_dct_features(row['Subtrial'])
        features['dct_energy'] = dct_results[0]
        features['dct_entropy'] = dct_results[1]
        
        df.at[idx, 'feature_vector'] = features
    
    return df

In [15]:
df_final = main()
df_final.head()

FileNotFoundError: [Errno 2] No such file or directory: 'data/processed/master_dataset.csv'