In [21]:
import numpy as np

# Function to load data and split into features and labels
def load_data(filepath):
	# Load the data from CSV file
	data = np.genfromtxt(filepath, delimiter=',', skip_header=1)  # skip_header is used if your CSV has a header row
	# Separate features and labels
	X = data[:, :-1]  # all rows, all columns except the last
	y = data[:, -1]   # all rows, last column
	return X, y

# Load datasets
X_train, y_train = load_data('../data/ABP_train_samples.csv')
X_val, y_val = load_data('../data/ABP_val_samples.csv')
X_test, y_test = load_data('../data/ABP_test_samples.csv')

# Print shapes to verify
print("Train Features Shape:", X_train.shape)
print("Train Labels Shape:", y_train.shape)
print("Validation Features Shape:", X_val.shape)
print("Validation Labels Shape:", y_val.shape)
print("Test Features Shape:", X_test.shape)
print("Test Labels Shape:", y_test.shape)

Train Features Shape: (17235, 3750)
Train Labels Shape: (17235,)
Validation Features Shape: (1915, 3750)
Validation Labels Shape: (1915,)
Test Features Shape: (8207, 3750)
Test Labels Shape: (8207,)


In [22]:
# Removing values from X_train that are less than or equal to 0 for more than 50% of duration
mask = X_train <= 0
count_le_zero = np.sum(mask, axis=1)
# Find the rows where more than 50% of the elements are <= 0
rows_to_remove = count_le_zero > (X_train.shape[1] / 2)

rows_to_keep = ~rows_to_remove

# Apply the mask to filter out the rows in X_train
X_train_filtered = X_train[rows_to_keep]
# Assuming y_train is a corresponding array of labels
y_train_filtered = y_train[rows_to_keep]

In [23]:
import numpy as np
from scipy import stats
from scipy.fft import fft
from scipy.stats import entropy
import scipy.signal as signal

def compute_time_domain_statistics(data):
	# Calculate mean
	mean = np.mean(data, axis=1)
	
	# Calculate median
	median = np.median(data, axis=1)
	
	# Calculate standard deviation
	std_dev = np.std(data, axis=1)
	
	# Calculate variance
	variance = np.var(data, axis=1)
	
	# Calculate interquartile range
	iqr = stats.iqr(data, axis=1)
	
	# Calculate skewness
	skewness = stats.skew(data, axis=1)
	
	# Calculate kurtosis
	scaled_data = data * 1e6  # Scale up the data to avoid precision issues
	kurtosis = stats.kurtosis(scaled_data, axis=1)
	
	# Calculate root mean square (RMS)
	rms = np.sqrt(np.mean(np.square(data), axis=1))
	
	# Calculate Shannon entropy
	def shannon_entropy(signal):
		# Normalize the signal
		s = np.sum(signal)
		
		prob_density = signal / (1 if s is None else s)
		# Use scipy's entropy function to calculate Shannon entropy
		return entropy(prob_density, base=2)
	
	shannon_entropy_values = np.apply_along_axis(shannon_entropy, 1, data)
	
	# Calculate mean and standard deviation of the first derivative
	first_derivative = np.diff(data, axis=1)
	mean_first_derivative = np.mean(first_derivative, axis=1)
	std_dev_first_derivative = np.std(first_derivative, axis=1)
	
	# Return a dictionary with all results
	return {
		'mean': mean,
		'median': median,
		'standard_deviation': std_dev,
		'variance': variance,
		'interquartile_range': iqr,
		'skewness': skewness,
		'kurtosis': kurtosis,
		'root_mean_square': rms,
		'shannon_entropy': shannon_entropy_values,
		'mean_first_derivative': mean_first_derivative,
		'std_dev_first_derivative': std_dev_first_derivative
	}

def compute_freq_domain_statistics(X, sample_rate = 125):
	features = {
		'first_moment': [],
		'second_moment': [],
		'third_moment': [],
		'fourth_moment': [],
		'median_frequency': [],
		'spectral_entropy': [],
		'total_spectral_power': [],
		'peak_amplitude': []
	}
	
	for signal in X:
		# Compute the FFT
		freq_data = fft(signal)
		# Get the power spectrum
		power_spectrum = np.abs(freq_data)**2
		# Get the frequencies for bins
		freqs = np.fft.fftfreq(len(signal), 1/sample_rate)
		
		# Consider only positive frequencies
		pos_mask = freqs > 0
		freqs = freqs[pos_mask]
		power_spectrum = power_spectrum[pos_mask]
		
		# Moments of the power spectrum
		total_power = np.sum(power_spectrum)
		average_power = power_spectrum / total_power
		features['first_moment'].append(np.sum(freqs * average_power))
		features['second_moment'].append(np.sum((freqs**2) * average_power))
		features['third_moment'].append(np.sum((freqs**3) * average_power))
		features['fourth_moment'].append(np.sum((freqs**4) * average_power))
		
		# Median frequency
		cumulative_power = np.cumsum(average_power)
		median_freq_index = np.where(cumulative_power >= 0.5)[0][0]
		features['median_frequency'].append(freqs[median_freq_index])
		
		# Spectral entropy
		features['spectral_entropy'].append(entropy(average_power))
		
		# Total spectral power
		features['total_spectral_power'].append(total_power)
		
		# Peak amplitude in 0 to 10 Hz
		relevant_mask = (freqs >= 0) & (freqs <= 10)
		peak_amplitude = np.max(power_spectrum[relevant_mask]) if np.any(relevant_mask) else 0
		features['peak_amplitude'].append(peak_amplitude)
	
	# Convert lists to numpy arrays for easier handling later
	for key in features:
		features[key] = np.array(features[key])

	return features


In [24]:
# Beat to Beat Analysis
def detect_systolic_peaks(abp_signal, sampling_rate):
	# Use a peak detection algorithm, scipy's find_peaks could be suitable
	peaks, _ = signal.find_peaks(abp_signal, distance=sampling_rate/2)  # distance based on typical human heart rate (0.5 sec apart)
	return peaks


def compute_intervals(peaks, sampling_rate):
	# Convert peak indices to time intervals in seconds
	intervals = np.diff(peaks) / sampling_rate
	return intervals


def compute_B2BInterval(data, sampling_rate=125):
	results = []
	for signal in data:
		peaks = detect_systolic_peaks(signal, sampling_rate)
		intervals = compute_intervals(peaks, sampling_rate)
		results.append(intervals)
	return results


def calculate_poincare_features(intervals):
    """
    Calculate the Poincaré plot features SD1 and SD2 for a given set of intervals.
    
    Args:
    intervals (np.array): numpy array of beat-to-beat intervals
    
    Returns:
    dict: dictionary with 'SD1', 'SD2', and 'SD1_SD2_ratio'
    """
    if len(intervals) < 2:
        return {'SD1': np.nan, 'SD2': np.nan, 'SD1_SD2_ratio': np.nan}

    # Calculate differences between consecutive intervals
    diff_intervals = np.diff(intervals)
    
    # Compute SD1 and SD2
    SD1 = np.sqrt(np.var(diff_intervals, ddof=1) / 2)
    SD2 = np.sqrt(2 * np.var(intervals, ddof=1) - (np.var(diff_intervals, ddof=1) / 2))
    
    # Compute the SD1/SD2 ratio
    SD1_SD2_ratio = SD1 / SD2
    
    return {'SD1': SD1, 'SD2': SD2, 'SD1_SD2_ratio': SD1_SD2_ratio}


def compute_poincare_features(interval_data):
    SD1 = []
    SD2 = []
    SD1_SD2_ratio = []

    for intervals in interval_data:
        result = calculate_poincare_features(intervals)
        SD1.append(result['SD1'])
        SD2.append(result['SD2'])
        SD1_SD2_ratio.append(result['SD1_SD2_ratio'])
    
    return {
        'SD1': np.array(SD1),
        'SD2': np.array(SD2),
        'SD1_SD2_ratio': np.array(SD1_SD2_ratio)
    }


In [25]:
train_time_domain_stats = compute_time_domain_statistics(X_train_filtered)
train_time_domain_stats_arr = np.column_stack(list(train_time_domain_stats.values()))

train_freq_domain_stats = compute_freq_domain_statistics(X_train_filtered)
train_freq_domain_stats_arr = np.column_stack(list(train_freq_domain_stats.values()))

X_train_B2Bintervals = compute_B2BInterval(X_train_filtered)
X_train_poincare_features = compute_poincare_features(X_train_B2Bintervals)
X_train_poincare_features_arr = np.column_stack(list(X_train_poincare_features.values()))


  SD2 = np.sqrt(2 * np.var(intervals, ddof=1) - (np.var(diff_intervals, ddof=1) / 2))


In [26]:
test_time_domain_stats = compute_time_domain_statistics(X_test)
test_time_domain_stats_arr = np.column_stack(list(test_time_domain_stats.values()))

test_freq_domain_stats = compute_freq_domain_statistics(X_test)
test_freq_domain_stats_arr = np.column_stack(list(test_freq_domain_stats.values()))

X_test_B2Bintervals = compute_B2BInterval(X_test)
X_test_poincare_features = compute_poincare_features(X_test_B2Bintervals)
X_test_poincare_features_arr = np.column_stack(list(X_test_poincare_features.values()))


In [27]:
print(train_time_domain_stats_arr.shape, train_freq_domain_stats_arr.shape, X_train_poincare_features_arr.shape)

print(test_time_domain_stats_arr.shape, test_freq_domain_stats_arr.shape, X_test_poincare_features_arr.shape)

(17159, 11) (17159, 8) (17159, 3)
(8207, 11) (8207, 8) (8207, 3)


In [28]:
train_features = np.hstack((train_time_domain_stats_arr, train_freq_domain_stats_arr, X_train_poincare_features_arr))

train_features_clean = np.nan_to_num(train_features, nan=0.0, posinf=0.0, neginf=0.0)

# Check the shape of the resulting array to ensure it is correct
print("Shape of the combined array:", train_features_clean.shape)

test_features = np.hstack((test_time_domain_stats_arr, test_freq_domain_stats_arr, X_test_poincare_features_arr))
test_features_clean = np.nan_to_num(test_features, nan=0.0, posinf=0.0, neginf=0.0)
print("Shape of the combined array:", test_features_clean.shape)



Shape of the combined array: (17159, 22)
Shape of the combined array: (8207, 22)


In [29]:
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC

# Assume train_features is loaded and has shape (597, 22)
# Assume y_train_filtered is loaded and has shape (597,)

# Create a scaler object
scaler = StandardScaler()

# Fit and transform the training data
X_train_scaled = scaler.fit_transform(train_features_clean)

X_test_scaled = scaler.transform(test_features_clean)

# Optional: Verify mean and std deviation
print("Mean (should be approx. 0):", np.mean(X_train_scaled, axis=0))
print("Standard Deviation (should be 1):", np.std(X_train_scaled, axis=0))


Mean (should be approx. 0): [-2.96832447e-15 -1.35395577e-16  1.61230148e-15 -1.61237265e-15
  9.77058052e-17 -3.59724812e-15 -8.35781056e-17  9.07136130e-16
 -4.99427607e-15 -8.12011129e-18 -2.07358688e-14 -1.10992863e-14
 -1.02606503e-15  1.17130826e-15 -2.09231975e-16 -3.91842926e-15
 -4.22503625e-15 -5.59198775e-14  1.39890430e-15 -1.00932336e-15
  1.03604209e-16 -1.71917718e-15]
Standard Deviation (should be 1): [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]


In [30]:
# Initialize the SVC (Support Vector Classifier) with RBF kernel
svm_classifier = SVC(kernel='rbf')

# Train the classifier
svm_classifier.fit(X_train_scaled, y_train_filtered)

# evaluate the classifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

y_pred = svm_classifier.predict(X_train_scaled)

print("Train Accuracy:", accuracy_score(y_train_filtered, y_pred))
print(classification_report(y_train_filtered, y_pred))
print(confusion_matrix(y_train_filtered, y_pred))


y_pred_test = svm_classifier.predict(X_test_scaled)

print("Test Accuracy:", accuracy_score(y_test, y_pred_test))
print("Classification report:", classification_report(y_test, y_pred_test))
print(confusion_matrix(y_test, y_pred_test))





Train Accuracy: 0.9513957689842065
              precision    recall  f1-score   support

         0.0       0.93      0.98      0.95      8618
         1.0       0.98      0.92      0.95      8541

    accuracy                           0.95     17159
   macro avg       0.95      0.95      0.95     17159
weighted avg       0.95      0.95      0.95     17159

[[8442  176]
 [ 658 7883]]
Test Accuracy: 0.9504081881320824
Classification report:               precision    recall  f1-score   support

         0.0       0.93      0.98      0.95      4104
         1.0       0.98      0.92      0.95      4103

    accuracy                           0.95      8207
   macro avg       0.95      0.95      0.95      8207
weighted avg       0.95      0.95      0.95      8207

[[4010   94]
 [ 313 3790]]
