Essential libraries to start working with !

In [None]:
import numpy as np
import pandas as pd 
from sklearn import metrics
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder

Libraries that we might need in the future !

In [None]:
# import tensorflow as tf
# import keras

# from keras.layers import Dense, Activation, Conv1D, MaxPooling1D, Flatten, LSTM, Dropout, GlobalMaxPooling1D
# from tensorflow.keras.optimizers import Adam, SGD
# from tensorflow.keras.models import Model
# from tensorflow.keras import Sequential

# from keras import regularizers
# from sklearn import preprocessing
# from sklearn.metrics import f1_score
# from scipy.stats import kurtosis, skew
# from sklearn.metrics import recall_score
# from sklearn.metrics import roc_auc_score
# from sklearn.datasets import make_circles
# from sklearn.metrics import accuracy_score
# from sklearn.metrics import precision_score
# from sklearn.metrics import confusion_matrix
# from sklearn.metrics import cohen_kappa_score
# from tensorflow.keras import utils as np_utils
# from sklearn.preprocessing import OneHotEncoder
# from keras.utils import to_categorical
# from sklearn.model_selection import train_test_split
# from sklearn.metrics import precision_recall_fscore_support
# from sklearn.preprocessing import StandardScaler, MinMaxScaler

Reading and filtering the original dataset

In [None]:
original_df = pd.read_csv('../data/ppg.csv')

In [None]:
df = original_df.copy()
df['CREATE_DATETIME'] = pd.to_datetime(df['CREATE_DATETIME'])   # Transform to datetime

df = df[df["READING_CATEGORY"] == "PPG"]                        # Keeping only the PPG signals 

encoder = LabelEncoder()
df['PATIENT_CODE'] = encoder.fit_transform(df['PATIENT_CODE'])  # Label encoding the IDs

df.drop(columns=["ID","READING_VALUE","READING_CATEGORY"],inplace=True)
df.rename(columns={"CREATE_DATETIME":"DATE"},inplace=True)

print("Nuber of unique patients:",len(df["PATIENT_CODE"].unique()))
print("Unique years of birth:",df["YEAR_OF_BIRTH"].unique())

df

In [None]:
temp_df = df[df["PATIENT_CODE"] == 11]
temp_df.reset_index(drop=True,inplace=True)
temp_df

Different methods for filtering the Singal

TO-DO: Let's find more methods

In [None]:
from scipy import signal

def apply_lowpass_filter(ppg_signal, filter_type='butter', sampling_rate=50, cutoff=5):
    nyquist_freq = 0.5 * sampling_rate
    normalized_cutoff = cutoff / nyquist_freq
    
    if filter_type == 'butter':
        b, a = signal.butter(4, normalized_cutoff, btype='low', analog=False)
    elif filter_type == 'cheby1':
        b, a = signal.cheby1(4, 0.5, normalized_cutoff, btype='low', analog=False)
    elif filter_type == 'cheby2':
        b, a = signal.cheby2(4, 20, normalized_cutoff, btype='low', analog=False)
    elif filter_type == 'elliptic':
        b, a = signal.ellip(4, 0.5, 20, normalized_cutoff, btype='low', analog=False)
    else:
        raise ValueError("Invalid filter type. Choose 'butter', 'cheby1', 'cheby2', or 'elliptic'.")
    
    ppg_filtered = signal.filtfilt(b, a, ppg_signal)
    return ppg_filtered

Feature extraction functions

TO-DO: We have to enrich our features

In [None]:
import scipy.special
from scipy.signal import find_peaks, welch
from collections import Counter
from pywt import wavedec

def calculate_entropy(list_values):
    counter_values = Counter(list_values).most_common()
    probabilities = [elem[1]/len(list_values) for elem in counter_values]
    entropy=scipy.stats.entropy(probabilities)
    return entropy

def calculate_statistics(list_values):
    n5 = np.nanpercentile(list_values, 5)
    n25 = np.nanpercentile(list_values, 25)
    n75 = np.nanpercentile(list_values, 75)
    n95 = np.nanpercentile(list_values, 95)
    median = np.nanpercentile(list_values, 50)
    mean = np.nanmean(list_values)
    std = np.nanstd(list_values)
    var = np.nanvar(list_values)
    rms = np.nanmean(np.sqrt(list_values**2))
    skew = pd.Series(list_values).skew()
    kurtosis = pd.Series(list_values).kurt()
    min = np.min(list_values)
    max = np.max(list_values)

    # Heart rate related features
    # sampling_rate = 50
    # peaks, _ = find_peaks(ppg_signal, height=0)                         # Find peaks in PPG signal
    # instant_hr = 60 * len(peaks) / len(ppg_signal) * sampling_rate      # Instantaneous Heart Rate
    # rri = np.diff(peaks) / sampling_rate                                # RR intervals
    # hrv = np.std(rri)                                                   # Heart Rate Variability

    # # Morphological features
    # peaks, _ = find_peaks(ppg_signal, distance=50)
    # peak_count = len(peaks)
    # peak_mean = np.mean(ppg_signal[peaks]) if len(peaks) > 0 else 0
    # peak_std = np.std(ppg_signal[peaks]) if len(peaks) > 0 else 0

    # Frequency domain features
    # freqs, psd = welch(ppg_signal)
    # psd_mean = np.mean(psd)
    # psd_std = np.std(psd)
    # psd_peak = freqs[np.argmax(psd)]

    # Time domain features
    # diff_signal = np.diff(ppg_signal)
    # diff_mean = np.mean(diff_signal)
    # diff_std = np.std(diff_signal)

    return [n5, n25, n75, n95, median, mean, std, var, rms, skew, kurtosis, min, max]

def calculate_crossings(list_values):
    zero_crossing_indices = np.nonzero(np.diff(np.array(list_values) > 0))[0]
    no_zero_crossings = len(zero_crossing_indices)
    mean_crossing_indices = np.nonzero(np.diff(np.array(list_values) > np.nanmean(list_values)))[0]
    no_mean_crossings = len(mean_crossing_indices)
    return [no_zero_crossings, no_mean_crossings]

def get_features(list_values):
    entropy = calculate_entropy(list_values)
    crossings = calculate_crossings(list_values)
    statistics = calculate_statistics(list_values)
    return [entropy] + crossings + statistics

Function that handles a ppg singal end-to-end (filtering --> analysis --> feature extraction)

In [None]:
def extract_features(ppg_signal):
    
    # 1) Filtering the Signal
    ppg_signal_filtered = apply_lowpass_filter(ppg_signal, filter_type='butter', sampling_rate=50, cutoff=5)

    # 2) Doing the Wavelet analysis to extract the detailed and approximate coefficients (this is considered a filtering method as well)
    coeffs = wavedec(ppg_signal_filtered, 'haar', level=5)
    cA1, cD1, cD2, cD3, cD4, cD5 = coeffs

    # Alternative: Compute the STFT of the signal
    # # Set parameters for STFT
    # fs = 50  # Sampling frequency (can be adjusted based on your data)
    # nperseg = 5  # Length of each segment

    # # Compute the STFT
    # frequencies, times, Zxx = stft(time_series, fs=fs, nperseg=nperseg)

    # # Step 3: Process the STFT output
    # # Get the magnitude (absolute value) of the complex numbers
    # magnitude_spectrum = np.abs(Zxx)

    # 3) Extracting features from the detailed and approximate coefficients
    features_1 = get_features(cD5)
    features_2 = get_features(cD4)
    features_3 = get_features(cD3)
    features_4 = get_features(cD2)
    features_5 = get_features(cD1)
    features_6 = get_features(cA1)
    
    # 4) Combining all the features
    features = features_1+features_2+features_3+features_4+features_5+features_6
    
    return features

In [None]:
total = []
valid = []
ids = []

unique_patients = df["PATIENT_CODE"].unique()

unique_patients = [7,17]

X = []
y = []

for unique_patient in unique_patients:
    temp_df = df[df["PATIENT_CODE"] == unique_patient]
    temp_df.reset_index(drop=True,inplace=True)

    # Calculate the difference between consecutive dates
    diff = temp_df['DATE'].diff()

    # Find continuous dates
    continuous_dates = diff.dt.total_seconds().fillna(0) == 1

    # Split DataFrame into subdataframes with continuous dates
    subdataframes = []
    for i, continuous in enumerate(continuous_dates):
        if i == 0 or not continuous:
            subdataframes.append(temp_df.iloc[i:i+1])
        else:
            subdataframes[-1] = pd.concat([subdataframes[-1], temp_df.iloc[i:i+1]])

    j=0
    total_timeseries = 0

    for i, subdf in enumerate(subdataframes):
        if subdf.shape[0] >= 10:                        # Keeping only timeseries of at least 10" of duration
            subdf.reset_index(drop=True,inplace=True)

            ppg_values = subdf["PPG_ARRAY"].values
            date_values = subdf["DATE"].values

            # duration = len(date_values)
            # print("Duration in seconds: ", duration)

            time_series = []
            for seconds_values in ppg_values:
                split_data = seconds_values[1:-1].split(', ')
                integer_list = [int(item) for item in split_data]

                for value in integer_list:
                    time_series.append(value)

            features = extract_features(time_series)

            if subdf["PATIENT_CODE"].values[0] == 7:
                label = 0
            else:
                label = 1

            X.append(features)
            y.append(label) 

            j+=1
        total_timeseries+=1

    ids.append(temp_df["PATIENT_CODE"].values[0])
    total.append(total_timeseries)
    valid.append(j)

X = np.array(X)  # Transforming the list of feautures --> numpy arrays
y = np.array(y)  # Transforming the list of labels--> numpy arrays

Plotting the selected patients and projecting the total number of CONTINUOUS timeseries and the timeseries above the predifiend time interval

In [None]:
fig = plt.figure(figsize=(15,10))

bar_width = 0.3

r1 = range(len(ids))
r2 = [x + bar_width for x in r1]

# Plotting
plt.bar(r1, total, color='r', width=bar_width, edgecolor='grey', label='Total Timeseries')
plt.bar(r2, valid, color='b', width=bar_width, edgecolor='grey', label='Timeseries > 10"')

# Add xticks on the middle of the group bars
plt.xlabel('ID', fontweight='bold')
plt.xticks([r + bar_width/2 for r in range(len(ids))], ids)

plt.ylabel('Num of Timeseries', fontweight='bold')
plt.legend()
plt.show()

Importing Machine-Learning libraries

In [None]:
import xgboost as xgb
from sklearn.svm import SVC, LinearSVC, NuSVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB, BernoulliNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression, RidgeClassifier, Perceptron, PassiveAggressiveClassifier, SGDClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, BaggingClassifier, ExtraTreesClassifier, GradientBoostingClassifier, VotingClassifier

In [None]:
import lightgbm as lgb
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold, GridSearchCV

def find_best_model(X, y):
    # Step 1: Normalize the data
    scaler = MinMaxScaler()          
    X_scaled = scaler.fit_transform(X)
    
    # Aditional Step 1: Split the data into training and testing sets
    # X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.1, shuffle=True, random_state=42)

    models = {
        'SVC': SVC(),
        'NuSVC': NuSVC(),
        'Linear SVC': LinearSVC(),
        'Decision Tree': DecisionTreeClassifier(),
        'MLP Classifier': MLPClassifier(),
        'Gaussian NB': GaussianNB(),
        'Bernoulli NB': BernoulliNB(),
        'K-Neighbors': KNeighborsClassifier(),
        'Linear Discriminant Analysis': LinearDiscriminantAnalysis(),
        'Quadratic Discriminant Analysis': QuadraticDiscriminantAnalysis(),
        'Perceptron': Perceptron(),
        'SGD Classifier': SGDClassifier(),
        'Ridge Classifier': RidgeClassifier(),
        'Logistic Regression': LogisticRegression(),
        'Passive Aggressive': PassiveAggressiveClassifier(),
        'AdaBoost': AdaBoostClassifier(),
        'Bagging': BaggingClassifier(),
        'Voting': VotingClassifier(estimators=[
            ('rf', RandomForestClassifier()),
            ('dt', DecisionTreeClassifier())
        ], voting='hard'),
        'Random Forest': RandomForestClassifier(),
        'Gradient Boosting': GradientBoostingClassifier(),
        'Extra Trees': ExtraTreesClassifier(),
        'XGBoost': xgb.XGBClassifier(n_estimators=100, use_label_encoder=False, objective='binary:logistic')
    }
    
    results = {}
    skf = StratifiedKFold(n_splits=10)
    
    for name, model in models.items():
        scores = cross_val_score(model, X_scaled, y, cv=skf, scoring='accuracy')
        results[name] = scores.mean()
    
    # Step 2: Print the results in a presentable manner
    results_df = pd.DataFrame.from_dict(results, orient='index', columns=['Accuracy'])
    results_df = results_df.sort_values(by='Accuracy', ascending=False)
    print("Cross-Validation Results (5-Fold):")
    print(results_df)
    
    # Aditional Step 2: Test the model's accuracy on the test set
    # best_model_name = results_df.idxmax().values[0]
    # best_model = models[best_model_name]
    # best_model.fit(X_train, y_train)

    # y_test_pred = best_model.predict(X_test)
    # test_accuracy = accuracy_score(y_test, y_test_pred)

    # print(f"\nBest Model: {best_model_name}")
    # print(f"Test Set Accuracy: {test_accuracy:.4f}")

    # return(y_test, y_test_pred)

In [None]:
# Run this if you run the alternative steps in "find_best_model" function: real_values , predictions = find_best_model(X,y)

# confusion_matrix = metrics.confusion_matrix(real_values, predictions)

# cm_display = metrics.ConfusionMatrixDisplay(confusion_matrix = confusion_matrix, display_labels = [0, 1])

# cm_display.plot()
# plt.show()

# # Calculate precision, recall, F1 score, and accuracy
# precision = metrics.precision_score(real_values, predictions)
# recall = metrics.recall_score(real_values, predictions)
# f1 = metrics.f1_score(real_values, predictions)
# accuracy = metrics.accuracy_score(real_values, predictions)

# # Print the results
# print(f'Precision: {precision:.2f}')
# print(f'Recall: {recall:.2f}')
# print(f'F1 Score: {f1:.2f}')
# print(f'Accuracy: {accuracy:.2f}')

In [None]:
find_best_model(X,y)

XGBoost - Finding best parameters/model 

We might this later - IGNORE for now

In [None]:
# import xgboost as xgb
# from sklearn.model_selection import StratifiedKFold, GridSearchCV

# def find_best_xgboost_model(X, y):
#     # Step 1 & 2: Scale the data
#     scaler = MinMaxScaler()          
#     X_scaled = scaler.fit_transform(X)
    
#     # Step 2: Split the data into training and testing sets
#     X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, shuffle=True, random_state=42)
   
#     # Step 3: Define the parameter grid for XGBoost
#     # param_grid = {
#     #     'n_estimators': [100, 200, 300, 400, 500],
#     #     'max_depth': [3, 4, 5, 6, 7, 8],
#     #     'learning_rate': [0.01, 0.05, 0.1, 0.15, 0.2],
#     #     'subsample': [0.6, 0.7, 0.8, 0.9, 1.0],
#     #     'colsample_bytree': [0.6, 0.7, 0.8, 0.9, 1.0],
#     #     'gamma': [0, 0.1, 0.2, 0.3],
#     #     'min_child_weight': [1, 2, 3, 4, 5]
#     # }

#     param_grid = {
#         'n_estimators': [100, 200, 300],
#         'max_depth': [3, 4, 5, 6],
#         'learning_rate': [0.01, 0.05, 0.1],
#         'subsample': [0.6, 0.8, 1.0],
#         'colsample_bytree': [0.6, 0.8, 1.0],
#         'gamma': [0, 0.1, 0.2],
#     }
    
#     # Step 4: Set up the XGBoost classifier
#     xgb_clf = xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss')
    
#     # Step 5: Set up the cross-validation and grid search
#     skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
#     grid_search = GridSearchCV(estimator=xgb_clf, param_grid=param_grid, scoring='accuracy', cv=skf, verbose=2, n_jobs=-1)
    
#     # Step 6: Perform the grid search
#     grid_search.fit(X_train, y_train)
    
#     # Step 7: Get the best model and parameters
#     best_model = grid_search.best_estimator_
#     best_params = grid_search.best_params_
    
#     # Step 8: Print the best parameters
#     print("Best Parameters found: ", best_params)
    
#     # Step 9: Test the best model's accuracy on the test set
#     y_test_pred = best_model.predict(X_test)
#     test_accuracy = accuracy_score(y_test, y_test_pred)
#     print(f"Test Set Accuracy: {test_accuracy:.4f}")
    
#     return best_model, best_params

In [None]:
# model, params = find_best_xgboost_model(X, y)