In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import neurokit2 as nk
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer
import scipy as sp
import copy 
import matplotlib.pyplot as plt
import copy
import warnings


warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=UserWarning)
warnings.simplefilter(action='ignore', category=RuntimeWarning)
pd.options.mode.chained_assignment = None

%matplotlib inline

# Config

In [20]:
# ECG sample frequency (Hz)
# Probably the 1000 Hz refers to the raw ECG sample rate. The RR-Interval values in the raw data are probably computed over 1 Hz
ecg_sampling_rate = 1000

# Sliding window in seconds and overlapping ration
ecg_sliding_window_size = 150  
ecg_overlapping_ratio = 0  

# Rename columns
ecg_column_mapping = {
    "Column2": "HR_bpm",
    "Column3": "RR_Interval_1024",
    "Column4": "Exadecimal1",
    "Column5": "Exadecimal2",
    "Column6": "RR_Interval_ms"
}

# Select columns of interest
ecg_selected_columns = ["User", "Condition", "Timestamp", "HR_bpm", "RR_Interval_ms"]

# Splitting dataset
conditions = ["user_cognitive-fatigue", "user_combo-fatigue", "user_physical-fatigue", "user_rest"]

# Dataset path
ecg_first_30_users_file_path = "../../data/processed/dataset_ecg_first_30_users.csv"

# Functions

In [4]:
def rename_columns(df, column_mapping):
    """
    Returns a copy of the DataFrame with renamed columns based on a dictionary mapping.

    Parameters:
        df (pd.DataFrame): The DataFrame to modify.
        column_mapping (dict): A dictionary containing the associations between the old column names
                               and the new column names.

    Returns:
        pd.DataFrame: A new DataFrame with the renamed columns.
    """
    return df.rename(columns=column_mapping)

In [5]:
def divide_column_by_value(df, column_name, divisor):
    """
    Divides the values in the specified column by the provided divisor.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        column_name (str): The name of the column to be divided.
        divisor (int or float): The value to divide the column by.

    Returns:
        pd.DataFrame: A new DataFrame with the specified column divided by the divisor.
    """
    new_df = df.copy()
    new_df[column_name] = new_df[column_name] / divisor
    return new_df

In [6]:
def rename_column(df, old_column_name, new_column_name):
    """
    Renames a single column in the DataFrame.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        old_column_name (str): The current name of the column.
        new_column_name (str): The new name for the column.

    Returns:
        pd.DataFrame: A new DataFrame with the specified column renamed.
    """
    new_df = df.copy()
    new_df.rename(columns={old_column_name: new_column_name}, inplace=True)
    return new_df

In [7]:
def split_data_by_condition(df, condition_column, condition_values):
    """
    Splits the input DataFrame into multiple subset DataFrames based on the specified conditions.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        condition_column (str): The name of the column to use for splitting (e.g., "Condition").
        condition_values (list): A list of values representing the conditions to split the DataFrame.

    Returns:
        list: A list of subset DataFrames, one for each condition specified in the list.
    """
    subset_dataframes = [df[df[condition_column] == value].copy() for value in condition_values]
    return subset_dataframes

In [8]:
def select_columns(df, columns_to_select):
    """
    Returns a new DataFrame with the subset of columns specified in the input list.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        columns_to_select (list): A list of column names to select from the DataFrame.

    Returns:
        pd.DataFrame: A new DataFrame containing the subset of columns specified in the input list.
    """
    return df[columns_to_select].copy()

In [9]:
def convert_timestamp_to_time(df, timestamp_column):
    """
    Converts the timestamp column in the DataFrame to datetime.time format and returns a new DataFrame.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        timestamp_column (str): The name of the timestamp column to convert.

    Returns:
        pd.DataFrame: A new DataFrame with the timestamp column converted to datetime.time format.
    """
    new_df = df.copy()
    new_df[timestamp_column] = pd.to_datetime(new_df[timestamp_column])
    return new_df

In [10]:
def split_dataset_into_windows(df, timestamp_column, column_to_split, window_size, overlap_ratio=0):
    """
    Splits the input DataFrame into temporal subsets using sliding windows.

    Parameters:
        df (pd.DataFrame): The input DataFrame.
        timestamp_column (str): The name of the timestamp column in the DataFrame.
        column_to_split (str): The name of the column on which to apply the sliding window.
        window_size (float): The size of the sliding window in seconds.
        overlap_ratio (float, optional): The percentage of overlapping between sliding windows (0 to 1).

    Returns:
        list: A list of DataFrames, each representing a temporal subset obtained using the sliding window.
    """
    # Sort the DataFrame by the timestamp column to ensure data is in chronological order
    df_sorted = df.sort_values(by=timestamp_column)

    # Calculate the number of data points to overlap
    overlap_points = int(window_size * overlap_ratio)

    # Create a list to store the temporal subsets (DataFrames)
    temporal_subsets = []

    # Apply the sliding window with or without overlapping
    for i in range(0, len(df_sorted) - window_size + 1, window_size - overlap_points):
        temporal_subset = df_sorted.iloc[i : i + window_size]
        temporal_subsets.append(temporal_subset)

    return temporal_subsets

In [11]:
def intervals_to_peaks(dataframe, column_name):
    """
    Convert RR-Interval values to R-peaks using NeuroKit2.

    Parameters:
        dataframe (pd.DataFrame): The input DataFrame.
        column_name (str): The name of the column containing RR-Interval values.

    Returns:
        pd.Series: A new Series containing the R-peaks obtained from the RR-Interval values.
    """
    r_peaks = nk.intervals_to_peaks(dataframe[column_name])
    return r_peaks

In [12]:
def calculate_hrv_features(r_peaks, sampling_rate, show=False):
    """
    Calculates Heart Rate Variability (HRV) features dataframe from R-peaks array using NeuroKit2.

    Parameters:
        r_peaks (array): An array containing the locations of R-peaks in the ECG signal.
                         R-peaks represent the heart's electrical activity peaks.
        sampling_rate (int): The sampling rate of the ECG signal, i.e., the number of samples per second.
                             It is used to convert time intervals to seconds in HRV calculations.
        show (bool, optional): A boolean to decide whether showing HRV-related plots or not. 
                               Set to True to display HRV-related plots; False otherwise.

    Returns:
        DataFrame: A pandas DataFrame containing various HRV features computed from the R-peaks.
                   The DataFrame includes metrics such as RMSSD, SDNN, pNN50, LF, HF, etc.
                   Each row corresponds to a specific HRV feature for the entire signal or specific segments.
    """
    
    # Calculate HRV features DataFrame using NeuroKit2
    hrv_features_df = nk.hrv(r_peaks, sampling_rate=sampling_rate, show=show)

    return hrv_features_df

In [13]:
def concatenate_dataframes(dataframes_list, axis=0):
    """
    Concatenates a list of DataFrames into a single DataFrame.

    Parameters:
        dataframes_list (list): A list of DataFrames to be concatenated.
        axis (int, optional): The axis along which the DataFrames will be concatenated.
                              If axis=0, concatenates vertically (rows).
                              If axis=1, concatenates horizontally (columns).

    Returns:
        pd.DataFrame: A new DataFrame obtained by concatenating all DataFrames in the input list.
    """
    concatenated_df = pd.concat(dataframes_list, axis=axis, ignore_index=True)
    return concatenated_df

# Execution Pipeline

In [14]:
# Read the dataset
df_ecg = pd.read_csv(ecg_first_30_users_file_path)

In [15]:
# Rename the dataset
df_ecg_renamed = rename_columns(df_ecg, ecg_column_mapping)

In [16]:
# Select columns of interest
df_ecg_selected = select_columns(df_ecg_renamed, ecg_selected_columns)

In [17]:
# Convert Timestamp column into DateTime dtype
df_ecg_converted_timestamp = convert_timestamp_to_time(df_ecg_selected, "Timestamp")

In [18]:
# Split the dataset
subset_dataframes_list = split_data_by_condition(df_ecg_converted_timestamp, "Condition", conditions)

In [21]:
# Generate the ECG Features dataset for each Condition
user_list = [f"user{i}" for i in range(30)]
hrv_df_condition_list = list()

for df in subset_dataframes_list:
    
    hrv_df_list = list()
    
    for user in user_list:
        #Get the df subset of a specific user
        user_subset = df.loc[df['User'] == user]
        #Split dataset into temporal subsets
        temporal_subsets = split_dataset_into_windows(user_subset, "Timestamp", "RR_Interval_ms", 
                                                      ecg_sliding_window_size, ecg_overlapping_ratio)
        for temporal_subset in temporal_subsets:
            # Compute R-Peaks from RR Interval
            r_peaks = intervals_to_peaks(temporal_subset, "RR_Interval_ms")
            # Get HRV features for each temporal subset
            hrv_features_df = calculate_hrv_features(r_peaks, sampling_rate = ecg_sampling_rate, show=False)
            # Aggiungere a sx le colonne User, Condition e Mean HR
            hrv_features_df.insert(0, "ECG_Rate_Mean", temporal_subset['HR_bpm'].mean())
            hrv_features_df.insert(0, "Condition", df['Condition'].unique()[0])
            hrv_features_df.insert(0, "User", user)
            hrv_df_list.append(hrv_features_df)
            
    hrv_df_condition = concatenate_dataframes(hrv_df_list, axis=0)  
    hrv_df_condition_list.append(hrv_df_condition)

### Inspect the DataFrames

In [22]:
hrv_df_condition_list[0].shape

(61, 94)

In [None]:
hrv_df_condition_list[1].shape

In [None]:
hrv_df_condition_list[2].shape

In [None]:
hrv_df_condition_list[3].shape