In [21]:
import numpy as np
from pathlib import Path
import pandas as pd
import shap
import time
import os
import wfdb
from pathlib import Path
import re 
from collections import defaultdict
import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import minimize

class Patient:
    def __init__(self):
        self.studies = []
        self.hAbp = []
        self.hIcp = []
        self.hct = []
    def __str__(self) -> str:
        return f"studies : {len(self.studies)} "
    

folder_path = Path('physionet.org/files/neurocritical-pediatric/1.0.0/waves')
header = [('ABP', float), ('CBFV', float), ('ICP', float)]

patient_static_map = defaultdict(Patient)

def process_time_series(folder_path, header):
    for filename in folder_path.glob('*.dat'):  # Ensure we process only .dat files
        
        entity = filename.stem.split("_")
        id = entity[0][7:]        
        # Read the data record
        record = wfdb.rdrecord(str(filename.with_suffix('')), sampfrom=0)
        data = np.empty(record.sig_len, dtype=header)
        temp_labels = []
        
        for i, signal in enumerate(record.p_signal.T):
            if i == 0:
                data['ABP'] = signal
            elif i == 1:
                data['ICP'] = signal
            else:
                data['CBFV'] = signal

        # normalized = normalize_data(data)
        patient_static_map[id].studies.append(data)

    print("Time Series Completed for Each Patient")


def patient_characteristics():
    for filename in folder_path.glob('*.hea'):
        entity = filename.stem.split("_")
        id = entity[0][7:]

        hAbp, hIcp, hct = 0, 0, 0 # Default values        
        with open(filename.with_suffix('.hea'), 'r') as f:
            for line in f:
                if "#" in line:
                    feature, value = re.search(r'\b(?:hAbp|hIcp|Hct)\b', line)[0], re.search(r'\b\d+(\.\d+)?\b', line)[0]
                    if feature == "hAbp":
                        hAbp = float(value)
                    elif feature == 'hIcp':
                        hIcp = float(value)
                    elif feature == 'Hct':
                        hct = float(value)
        
        patient_static_map[id].hIcp.append(hIcp)
        patient_static_map[id].hAbp.append(hAbp)
        patient_static_map[id].hct.append(hct)


def process_data():
    process_time_series(folder_path, header)
    patient_characteristics()
        
process_data()

Time Series Completed for Each Patient


In [24]:
from sklearn.multioutput import MultiOutputRegressor
from sklearn.linear_model import LinearRegression
import random

def rolling_window_split_by_60(data):
    window_size = 60
    num_windows = len(data) // window_size
    windows_data = [data[i*window_size : (i+1)*window_size] for i in range(num_windows)]
    return windows_data

def estimate_icp(abp, cbfv, fs, icp):
    # Convert signals to numpy arrays for computation
    abp = np.array(abp)
    cbfv = np.array(cbfv)
    
    def equation(params):
        R, C = params
        q = cbfv / R
        dq_dt = np.gradient(q, 1/fs)
        return np.mean(abp - R * q - C * dq_dt)  # Example equation
    
    def objective_function( params, ground_truth):
        prediction = equation(params)
        return np.mean((prediction - ground_truth)**2)
    
    initial = [0.85,0.20]
    result = minimize(objective_function, initial, args=(icp,), method='BFGS')
    return equation(result.x), result.x

def estimate_icp_equation(abp, cbfv, fs, R, C):
    q = cbfv / R
    dq_dt = np.gradient(q, 1/fs)
    return np.mean(abp - R * q - C * dq_dt)

def rc_data(sample_fraction=0.5):
    actuals = []
    estimated = []
    train_data = []
    rc_y = []
    for key in patient_static_map.keys():
        for i,study in enumerate(patient_static_map[key].studies):
            data = study 
            waves= rolling_window_split_by_60(data)
            for wvform in random.sample(waves, k=int(sample_fraction*len(waves))):
                actual_icp = np.mean(wvform['ICP'])
                estimated_icp, params = estimate_icp(wvform['ABP'], wvform['CBFV'], 125, actual_icp)
                actuals.append(actual_icp)
                estimated.append(estimated_icp)
                train_data.append([np.mean(wvform['ABP']), np.mean(wvform['CBFV'])])
                rc_y.append(params)
    return train_data, rc_y

def estimate_optimizer():
    train, y = rc_data()
    model = LinearRegression()
    multi_output_model = MultiOutputRegressor(model)
    multi_output_model.fit(train, y)
    return multi_output_model

model = estimate_optimizer()

In [67]:
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import mean_squared_error
from math import sqrt
from sklearn.metrics import r2_score
from sklearn.metrics import mean_squared_log_error
from sklearn.metrics import median_absolute_error
from sklearn.metrics import explained_variance_score
from sklearn.linear_model import Ridge

def split_study_blocks_by_beat(data):
    new_header = [('Mean ABP', float), ('Mean CBFV', float), ('ICP Estimate', float)]
   
    beats = []
    estimates = []
    labels = []
    for wvform in rolling_window_split_by_60(data):
        mean_abp = np.mean(wvform['ABP'])
        mean_cbfv = np.mean(wvform['CBFV'])
        mean_icp = np.mean(wvform['ICP'])
        params = model.predict([[mean_abp, mean_cbfv]])
        estimate = estimate_icp_equation(np.array(wvform['ABP']), np.array(wvform['CBFV']), 125, params[0][0], params[0][1])
        data = [mean_abp, mean_cbfv, estimate*0.05]
        beats.append(data)
        labels.append(mean_icp)
        estimates.append(estimate)

    # iterations = np.arange(1, 51)  # Iterations from 1 to 10
    # # Creating the plot
    # plt.figure(figsize=(10, 6))
    # plt.plot(iterations, estimates[:50], label='Estimates', marker='o')  # Plot Array 1
    # plt.plot(iterations, labels[:50], label='Actual', marker='x')  # Plot Array 2
    # plt.title('Comparison of ICP vs Estimated ICP')
    # plt.xlabel('Iteration per 60 beats')
    # plt.ylabel('ICP')
    # plt.legend()
    # plt.grid(True)

    # Show the plot
    # plt.show()
    return np.array(beats), np.array(labels)

def rolling_split_by_window(data, labels, window):
    train_window = int(window * 0.6)
    test_window = int(window*0.2)

    X = data[:train_window]
    y = labels[len(labels) - test_window-1 : ]
    X = np.mean(X, axis=0)
    y = np.mean(y)
    return X,y

def get_MSE(actual, predicted):
    """Calculate the mean squared error between actual and predicted values."""
    return np.mean((actual - predicted) ** 2)

def test_all_studies(key):
    studies = patient_static_map[key].studies
    
    y_tests = []
    y_preds = []
    for study in studies:
        beats, lbl = split_study_blocks_by_beat(study)
        window = 20
        current = 0
        X = []
        y = []

        while current+window < len(beats):
            smallX, smallY = rolling_split_by_window(beats[current:current+window], lbl[current:current+window], window)
            X.append(smallX)
            y.append(smallY)
            current += 1

        X = np.array(X)
        y = np.array(y)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

        linear_model = LinearRegression()
        linear_model.fit(X_train, y_train)
        
        # Make predictions
        y_pred = linear_model.predict(X_test)

        # Evaluate the model
        # mse = get_MSE(y_test, y_pred)
        # print("Mean Squared Error:", mse)
        # mses.append(mse)
        # explainer = shap.Explainer(linear_model, X_train)

        # # Compute SHAP values
        # shap_values = explainer(X_test)
        # # Summary plot
        # shap.summary_plot(shap_values, X_test, feature_names=['ABP', 'CPP', 'Estimate ICP'])

        # Dependence plot for a specific feature
        # shap.dependence_plot("Feature 1", shap_values.values, X_test, feature_names=['ABP', 'CPP', 'Estimate ICP'])

        # # Waterfall plot for the first prediction
        # shap.waterfall_plot(shap_values[0])
        y_tests.append(y_test)
        y_preds.append(y_pred)
    return np.concatenate(y_tests),np.concatenate(y_preds)

def test_all_studies_forest(key):
    studies = patient_static_map[key].studies
    y_tests = []
    y_preds = []

    for study in studies:
        beats, lbl = split_study_blocks_by_beat(study)
        window = 20
        current = 0
        X = []
        y = []

        while current+window < len(beats):
            smallX, smallY = rolling_split_by_window(beats[current:current+window], lbl[current:current+window], window)
            X.append(smallX)
            y.append(smallY)
            current += 1

        X = np.array(X)
        y = np.array(y)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

        # Initialize the RandomForestRegressor
        forest = RandomForestRegressor(n_estimators=10)  # 100 trees in the forest

        # Fit the model
        forest.fit(X_train, y_train)

        
        # Make predictions
        y_pred = forest.predict(X_test)

        # Calculate MSE
        
        y_tests.append(y_test)
        y_preds.append(y_pred)
    return np.concatenate(y_tests),np.concatenate(y_preds)

def test_all_studies_ridge(key):
    studies = patient_static_map[key].studies
    
    y_tests = []
    y_preds = []
    for study in studies:
        beats, lbl = split_study_blocks_by_beat(study)
        window = 20
        current = 0
        X = []
        y = []

        while current+window < len(beats):
            smallX, smallY = rolling_split_by_window(beats[current:current+window], lbl[current:current+window], window)
            X.append(smallX)
            y.append(smallY)
            current += 1

        X = np.array(X)
        y = np.array(y)
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

        linear_model = Ridge()
        linear_model.fit(X_train, y_train)
        
        # Make predictions
        y_pred = linear_model.predict(X_test)

        # Evaluate the model
        # mse = get_MSE(y_test, y_pred)
        # print("Mean Squared Error:", mse)
        # mses.append(mse)
        # explainer = shap.Explainer(linear_model, X_train)

        # # Compute SHAP values
        # shap_values = explainer(X_test)
        # # Summary plot
        # shap.summary_plot(shap_values, X_test, feature_names=['ABP', 'CPP', 'Estimate ICP'])

        # Dependence plot for a specific feature
        # shap.dependence_plot("Feature 1", shap_values.values, X_test, feature_names=['ABP', 'CPP', 'Estimate ICP'])

        # # Waterfall plot for the first prediction
        # shap.waterfall_plot(shap_values[0])
        y_tests.append(y_test)
        y_preds.append(y_pred)
    return np.concatenate(y_tests),np.concatenate(y_preds)
def evaluate_performance(y_test,y_pred):
    mae = mean_absolute_error(y_test, y_pred)
    print("Mean Absolute Error:", mae)  
    rmse = sqrt(mean_squared_error(y_test, y_pred))
    print("Root Mean Squared Error:", rmse)
    r2 = r2_score(y_test, y_pred)
    print("R-squared:", r2)
    median_ae = median_absolute_error(y_test, y_pred)
    print("Median Absolute Error:", median_ae)
    explained_variance = explained_variance_score(y_test, y_pred)
    print("Explained Variance Score:", explained_variance)
# print(test_all_studies_forest('01'))
y_test,y_pred = test_all_studies_ridge('01')
evaluate_performance(y_test,y_pred)


Mean Absolute Error: 0.46036779888824164
Root Mean Squared Error: 0.7519628193793629
R-squared: 0.9336355416432183
Median Absolute Error: 0.2627453827284949
Explained Variance Score: 0.9337437211632618
