In [None]:
import pandas as pd
import numpy as np
from pathlib import Path


In [None]:
# Path to folder containing the CSV files
year = 2

base_feature_path = Path(f"C:/Users/Emanuele/Desktop/Feat_extr_ANNO{year}/original_features")

path_eng_features = Path(f"C:/Users/Emanuele/Desktop/Feat_extr_ANNO{year}/eng_features")

if not path_eng_features.exists():
    path_eng_features.mkdir(parents=True)
    
# Load the CSV files in a list
csv_files = list(base_feature_path.glob("*.csv"))

# Sort the csv fil base by the name Task_1, Task_2, etc.
csv_files.sort(key=lambda x: int(x.stem.split('_')[1]))

In [None]:
columns_to_keep = [
    # Mandatory
    'Id',

    # Dynamic parameters
    'mean:pressure', 'std:pressure', 'iqr:pressure',
    'mean:tilt(on-surface)', 'std:tilt(on-surface)', 'iqr:tilt(on-surface)',

    # Temporal parameters
    'writing_duration(on-surface)',
    'writing_duration_overall',
    'number_of_interruptions',

    # Spatial parameters
    'writing_width(on-surface)',
    'writing_height(on-surface)',
    'mean:stroke_width(on-surface)', 'std:stroke_width(on-surface)', 'iqr:stroke_width(on-surface)',
    'mean:stroke_height(on-surface)', 'std:stroke_height(on-surface)', 'iqr:stroke_height(on-surface)',

    # Kinematic parameters
    'mean:velocity:axis-xy(on-surface)',
    'std:velocity:axis-xy(on-surface)',
    'mean:acceleration:axis-xy(on-surface)',
    'std:acceleration:axis-xy(on-surface)',
    'mean:jerk:axis-xy(on-surface)',
    'mean:stroke_duration(on-surface)',
    'mean:stroke_length(on-surface)',
    'number_of_changes_in_x_profile',
    'number_of_changes_in_y_profile',
    'number_of_changes_in_pressure_profile',
    
    # Personal information
    'Gender', 'Age', 'Dominant_Hand', 'Label', 'Task'
]

In [None]:
def compute_in_air_movement_time(df):
    """
    Compute in-air movement time as the difference between overall and on-surface durations.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing at least the columns:
        - 'writing_duration_overall'
        - 'writing_duration(on-surface)'

    Returns
    -------
    pd.Series
        Series with the in-air movement time for each row.
    """
    result = df["writing_duration_overall"] - df["writing_duration(on-surface)"]
    # Replace negatives (if any) with zero
    result[result < 0] = 0
    return result


In [None]:
def compute_in_air_on_paper_ratio(df):
    """
    Compute ratio between in-air movement time and on-paper movement time.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing at least:
        - 'writing_duration_overall'
        - 'writing_duration(on-surface)'

    Returns
    -------
    pd.Series
        Series with the in-air/on-paper ratio for each row.
    """

    in_air_time = compute_in_air_movement_time(df)
    on_paper_time = df["writing_duration(on-surface)"]
    
    ratio = np.where(on_paper_time > 0,
                     in_air_time / on_paper_time,
                     0)
    return ratio


In [None]:
import numpy as np

def compute_average_normalized_jerk(df):
    """
    Compute Average Normalized Jerk (ANJ) for each row in the DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame containing at least:
        - 'mean:jerk:axis-xy(on-surface)'
        - 'mean:stroke_duration(on-surface)'
        - 'mean:stroke_length(on-surface)'

    Returns
    -------
    pd.Series
        Series with the computed ANJ for each row.

    Raises
    ------
    ValueError
        If any required values are zero or negative (e.g. stroke duration or length).
    """
    avg_jerk = df["mean:jerk:axis-xy(on-surface)"]
    avg_stroke_duration = df["mean:stroke_duration(on-surface)"]
    avg_stroke_length = df["mean:stroke_length(on-surface)"]

    # Create a result series, defaulting to zeros
    anj = pd.Series(0, index=df.index, dtype=float)

    # Define valid rows
    valid_mask = (
        (avg_stroke_duration > 0) &
        (avg_stroke_length > 0) &
        (avg_jerk >= 0)
    )
    
    if valid_mask.any():
        normalization_factor = (avg_stroke_duration[valid_mask] ** 5) / (avg_stroke_length[valid_mask] ** 2)
        anj[valid_mask] = np.sqrt(0.5 * avg_jerk[valid_mask] * normalization_factor)
    
    return anj


In [None]:
def get_task_mapping_for_year(year):
    """
    Get the task mapping dictionary based on the year.
    
    For year 3, we need to rearrange tasks as follows:
    - Task_3 becomes Task_1
    - Task_4 becomes Task_2  
    - Task_5 is skipped (not computed)
    - Task_6 becomes Task_3
    - Task_7 becomes Task_4
    - ... and so on
    - Tasks 1, 2, and 5 from original are not computed
    
    Parameters
    ----------
    year : int
        The year for which to get the task mapping
        
    Returns
    -------
    dict
        Dictionary mapping original task numbers to new task numbers
        None values indicate tasks that should be skipped
    """
    if year == 3:
        # Create mapping for year 3
        task_mapping = {}
        
        # Tasks 1, 2, and 5 are not computed (skipped)
        task_mapping[1] = None
        task_mapping[2] = None
        task_mapping[5] = None
        
        # Task 3 becomes Task 1
        task_mapping[3] = 1
        
        # Task 4 becomes Task 2
        task_mapping[4] = 2
        
        # Task 6 onwards: subtract 4 from original task number
        # (Task 6 -> 2, Task 7 -> 3, Task 8 -> 4, etc.)
        for original_task in range(6, 23):  # Assuming tasks go up to 22
            new_task = original_task - 4
            task_mapping[original_task] = new_task
            
        return task_mapping
    
    else:
        # For years 1 and 2, no mapping needed (identity mapping)
        return None


def rename_csv_file_for_year(csv_file_path, year):
    """
    Rename a CSV file based on the year's task mapping.
    
    Parameters
    ----------
    csv_file_path : Path
        Path to the CSV file
    year : int
        The year for which to apply task renaming
        
    Returns
    -------
    Path or None
        New path for the renamed file, or None if file should be skipped
    """
    task_mapping = get_task_mapping_for_year(year)
    
    if task_mapping is None:
        # No mapping needed for this year
        return csv_file_path
    
    # Extract original task number from filename
    filename = csv_file_path.name
    if filename.startswith("Task_") and filename.endswith(".csv"):
        try:
            original_task_num = int(filename.split("_")[1].split(".")[0])
            
            # Get new task number from mapping
            new_task_num = task_mapping.get(original_task_num)
            
            if new_task_num is None:
                # This task should be skipped
                return None
            
            # Create new filename
            new_filename = f"Task_{new_task_num}.csv"
            new_path = csv_file_path.parent / new_filename
            
            return new_path
            
        except (ValueError, IndexError):
            # If we can't parse the task number, return original path
            return csv_file_path
    
    # If filename doesn't match expected pattern, return original path
    return csv_file_path

In [None]:
for csv_file in csv_files:
    df = pd.read_csv(csv_file)
    
    print(f"Processing {csv_file.name}...")
    
    if df.empty:
        print(f"Skipping {csv_file.name} because it is empty.")
        continue

    # Check if this file should be processed based on year
    new_file_path = rename_csv_file_for_year(csv_file, year)
    
    if new_file_path is None:
        print(f"Skipping {csv_file.name} - not computed for year {year}")
        continue

    df_filtered = df[columns_to_keep].copy()

    # Add derived features safely
    df_filtered["in_air_movement_time"] = compute_in_air_movement_time(df_filtered)
    df_filtered["in_air_on_paper_ratio"] = compute_in_air_on_paper_ratio(df_filtered)
    df_filtered["average_normalized_jerk"] = compute_average_normalized_jerk(df_filtered)
    
    # Update Task column if year 3 remapping is applied
    if year == 3 and 'Task' in df_filtered.columns:
        # Extract new task number from the new filename
        new_task_num = int(new_file_path.stem.split('_')[1])
        df_filtered['Task'] = new_task_num
        print(f"Updated Task column from {csv_file.stem} to Task_{new_task_num}")
    
    # Move Columns 'Gender', 'Age', 'Dominant_Hand', 'Label', 'Task' to the end
    columns_to_move = ['Gender', 'Age', 'Dominant_Hand', 'Label', 'Task']
    
    for col in columns_to_move:
        if col in df_filtered.columns:
            # Move the column to the end
            df_filtered[col] = df_filtered.pop(col)
        else:
            print(f"Column {col} not found in {csv_file.name}, skipping.")
            
    # Round numeric columns to 5 decimal places
    numeric_cols = df_filtered.select_dtypes(include=[np.number]).columns
    df_filtered[numeric_cols] = df_filtered[numeric_cols].round(5)
    
    # Save the processed DataFrame using the new filename
    output_file = path_eng_features / new_file_path.name
    df_filtered.to_csv(output_file, index=False)
    
    print(f"Saved as {new_file_path.name}")

