In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Bidirectional, Dense, Flatten
from sklearn.decomposition import PCA
from scipy.fft import fft
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
import seaborn as sns


url = 'https://archive.physionet.org/physiobank/database/challenge/2015/training/RECORDS'

response = requests.get(url)

# Check if the request was successful (status code 200)
if response.status_code == 200:

    text = response.text
    print("Successfully retrieve the patient Id's Status code:", response.status_code)
else:
    print("Failed to retrieve the text content. Status code:", response.status_code)

lines = text.strip().split('\n')

csv_file = 'record_names.csv'

with open(csv_file, 'w', newline='') as file:
    writer = csv.writer(file)
    writer.writerows([[line] for line in lines])

print(f'CSV file "{csv_file}" has been created.')


csv_file = 'record_names.csv'

patient_ids_df = pd.read_csv(csv_file, header=None)

patient_ids = patient_ids_df.to_numpy().reshape(-1)

print(patient_ids)

In [None]:
# Feature extraction - Add the most dense frequency capture
def capture_dominant_frequency(signal, fs):
    # Perform Fourier Transform on the signal to get frequency components
    N = len(signal)
    freqs = np.fft.fftfreq(N, d=1/fs)
    fft_vals = fft(signal)

    # Get the magnitude of the FFT results
    magnitude = np.abs(fft_vals)

    # Find the index of the highest magnitude, which corresponds to the dominant frequency
    dominant_frequency_index = np.argmax(magnitude[1:]) + 1  # Exclude the DC component
    dominant_frequency = np.abs(freqs[dominant_frequency_index])

    return dominant_frequency

from scipy.signal import find_peaks

def extract_ppg_features(ppg_signal, fs):
    # Peak Detection with higher threshold
    peak_height_threshold = 0.5 * (np.percentile(ppg_signal, 75) + np.median(ppg_signal)) # Might have to adjust accordingly
    peaks, _ = find_peaks(ppg_signal, height=peak_height_threshold)

    peak_features = []

    prev_peak_index = None

    for peak_index in peaks:

        window_size = int(fs * 2)  # Example: 0.5 seconds window
        peak_window = ppg_signal[max(0, peak_index - window_size):min(len(ppg_signal), peak_index + window_size)]

        # Morphological Features
        sa = np.max(peak_window)
        Da = np.min(peak_window)
        SA = np.trapz(peak_window, dx=1/fs)
        DA = np.trapz(peak_window, dx=1/fs) - SA
        St = peak_index / fs
        Dt = (len(ppg_signal) - peak_index) / fs

        # Frequency Domain Features
        fft_result = fft(peak_window)
        magnitude_spectrum = np.abs(fft_result)
        frequency_spectrum = np.fft.fftfreq(len(peak_window), d=1/fs)
        dominant_frequency = frequency_spectrum[np.argmax(magnitude_spectrum)]
        pmf = magnitude_spectrum / np.sum(magnitude_spectrum)
        spectral_entropy = -np.sum(pmf * np.log2(pmf + 1e-10))

        # Additional features
        if prev_peak_index is not None:
            ppi = (peak_index - prev_peak_index) / fs
        else:
            ppi = 0

        pi = ppi
        pw = calculate_pulse_width(peak_window, fs)
        fwhm = calculate_fwhm(peak_window, fs)

        signal_area = np.trapz(ppg_signal, dx=1/fs)
        rise_time = St - (max(0, peak_index - window_size)) / fs
        fall_time = (min(len(ppg_signal), peak_index + window_size) - peak_index) / fs

        amplitude_modulation_depth = sa - Da
        energy = np.sum(peak_window ** 2)
        zero_crossing_rate = calculate_zero_crossing_rate(peak_window)

        # Statistical Features
        mean = np.mean(peak_window)
        median = np.median(peak_window)
        std_deviation = np.std(peak_window)
        skewness = stats.skew(peak_window)
        kurtosis = stats.kurtosis(peak_window)
        min_value = np.min(peak_window)
        max_value = np.max(peak_window)
        variance = np.var(peak_window)

        # Additional features
        slope = calculate_slope(peak_window, fs)
        peak_count = len(peaks)

        # Handle exceptions for divide by zero errors
        try:
            amplitude_ratio = sa / Da
            area_ratio = SA / DA
            interval_ratio = pi / ppi if ppi != 0 else 0
        except ZeroDivisionError:
            amplitude_ratio = 0
            area_ratio = 0
            interval_ratio = 0

        # Append all features to the list
        peak_features.append({
            'sa': sa, 'Da': Da, 'SA': SA, 'DA': DA, 'St': St, 'Dt': Dt,
            'PI': pi, 'PPI': ppi, 'PW': pw, 'FWHM': fwhm,
            'Dominant_frequency': dominant_frequency,
            'Spectral_entropy': spectral_entropy,
            'Signal_area': signal_area,
            'Rise_time': rise_time,
            'Fall_time': fall_time,
            'Amplitude_modulation_depth': amplitude_modulation_depth,
            'Energy': energy,
            'Zero_crossing_rate': zero_crossing_rate,
            'Mean': mean,
            'Median': median,
            'Standard_deviation': std_deviation,
            'Skewness': skewness,
            'Kurtosis': kurtosis,
            'Min_value': min_value,
            'Max_value': max_value,
            'Variance': variance,
            'Slope': slope,
            'Peak_count': peak_count,
            'Amplitude_ratio': amplitude_ratio,
            'Area_ratio': area_ratio,
            'Interval_ratio': interval_ratio
            # Add more features as needed
        })

        prev_peak_index = peak_index

    return peak_features, peaks

def calculate_slope(signal, fs):
    time = np.arange(0, len(signal)) / fs
    slope, _ = np.polyfit(time, signal, 1)
    return slope

def calculate_pulse_width(peak_window, fs):
    half_max = 0.5 * np.max(peak_window)
    above_half = peak_window >= half_max
    crossings = np.where(above_half)[0]
    if len(crossings) < 2:
        return 0.0
    return (crossings[-1] - crossings[0]) / fs

def calculate_fwhm(peak_window, fs):
    half_max = 0.5 * np.max(peak_window)
    cross_indices = np.where(peak_window >= half_max)[0]
    if len(cross_indices) == 0:
        return 0.0
    return (cross_indices[-1] - cross_indices[0]) / fs

def calculate_zero_crossing_rate(signal):
    # Calculate zero crossing rate
    return np.sum(np.diff(np.sign(signal)) != 0) / len(signal)


In [None]:
# Apply PCA to reduce dimensionality of extracted features
def apply_pca(features, n_components=10):
    pca = PCA(n_components=n_components)
    scaled_features = StandardScaler().fit_transform(features)  # Scale features before PCA
    pca_features = pca.fit_transform(scaled_features)
    return pca_features

# Prepare dataset with PCA
dataset = []
i = 0
for label, patient_ids in patient_ids_by_activity.items():
    for patient_id in patient_ids:
        if patient_id not in pleth_signals:
            print(f"Patient ID {patient_id} not found in pleth_signal dictionary")
            continue
        fs = pleth_infos[patient_id]['frequency']
        segmented_signal = segment_signal(pleth_signals[patient_id], 10, fs)

        for segment in segmented_signal:
            segment_processed = bandpass_filter(segment, lowcut=0.05, highcut=10, fs=fs)
            segment_processed = moving_average(segment_processed, window_size=3)
            segment_processed = remove_baseline_wandering(segment_processed)

            if not any(segment_processed):
                continue

            features = extract_ppg_features(segment_processed, fs)
            for feature in features:
                peak_array = [feature[key] for key in feature.keys()]
                dataset.append((label, peak_array))
                i += 1

# Prepare labels and features for training
labels = [label for label, _ in dataset]
signals = [signal for _, signal in dataset]

# One-hot encode the labels
unique_labels = set(labels)
label_to_index = {label: i for i, label in enumerate(unique_labels)}
index_to_label = {i: label for label, i in label_to_index.items()}
encoded_labels = [to_categorical(label_to_index[label], num_classes=len(unique_labels)) for label in labels]

# Apply PCA to features
features = [feature for _, feature in dataset]
pca_features = apply_pca(features, n_components=10)

# Padding the sequences
max_length = max(len(signal) for signal in signals)
padded_signals = [np.pad(signal, (0, max_length - len(signal))) for signal in signals]

# Convert to numpy arrays
X = np.array(padded_signals)
y = np.array(encoded_labels)

# Add PCA features as additional input
X_pca = np.hstack([X.reshape(X.shape[0], -1), pca_features])

# Reshape for LSTM/BiLSTM input (no PCA in this section)
X_train, X_test, y_train, y_test = train_test_split(X_pca, y, test_size=0.2, random_state=42)

# Reshaping for LSTM/BiLSTM input
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)


In [None]:

# Create LSTM model with ANN after LSTM
def create_lstm_ann_model(input_shape, num_classes):
    model = Sequential([
        LSTM(64, return_sequences=True, input_shape=input_shape),
        LSTM(64),
        Dense(32, activation='relu'),
        Dense(32, activation='relu'),  # Adding ANN layer after LSTM
        Dense(num_classes, activation='softmax')
    ])
    return model

# Create BiLSTM model with ANN after BiLSTM
def create_bilstm_ann_model(input_shape, num_classes):
    model = Sequential([
        Bidirectional(LSTM(64, return_sequences=True), input_shape=input_shape),
        Bidirectional(LSTM(64)),
        Dense(32, activation='relu'),
        Dense(32, activation='relu'),  # Adding ANN layer after BiLSTM
        Dense(num_classes, activation='softmax')
    ])
    return model

# Model Creation
input_shape = X_train.shape[1:]  # Shape of input features
num_classes = len(unique_labels)

lstm_ann_model = create_lstm_ann_model(input_shape, num_classes)
bilstm_ann_model = create_bilstm_ann_model(input_shape, num_classes)

# Compile the models
lstm_ann_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
bilstm_ann_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

# Train the models
history_lstm_ann = lstm_ann_model.fit(X_train, y_train, epochs=50, batch_size=64, validation_data=(X_test, y_test))
history_bilstm_ann = bilstm_ann_model.fit(X_train, y_train, epochs=50, batch_size=64, validation_data=(X_test, y_test))

# Model evaluation
lstm_evaluation = lstm_ann_model.evaluate(X_test, y_test)
bilstm_evaluation = bilstm_ann_model.evaluate(X_test, y_test)
print("LSTM+ANN Evaluation:", lstm_evaluation)
print("BiLSTM+ANN Evaluation:", bilstm_evaluation)

# Save models
lstm_ann_model.save('/home/nakul/Documents/Python/Arrhythmia_Classification_PPG/lstm_ann_model.h5')
bilstm_ann_model.save('/home/nakul/Documents/Python/Arrhythmia_Classification_PPG/bilstm_ann_model.h5')

# Plotting the training and testing accuracy/loss
epochs = [i for i in range(50)]
fig, ax = plt.subplots(1, 2)

# LSTM+ANN Accuracy and Loss
ax[0].plot(epochs, history_lstm_ann.history['loss'], label='Training Loss')
ax[0].plot(epochs, history_lstm_ann.history['val_loss'], label='Testing Loss')
ax[0].set_title('LSTM+ANN Loss')
ax[0].legend()
ax[0].set_xlabel("Epochs")
ax[0].set_ylim(0, 1)

ax[1].plot(epochs, history_lstm_ann.history['accuracy'], label='Training Accuracy')
ax[1].plot(epochs, history_lstm_ann.history['val_accuracy'], label='Testing Accuracy')
ax[1].set_title('LSTM+ANN Accuracy')
ax[1].legend()
ax[1].set_xlabel("Epochs")
ax[1].set_ylim(0, 1)

plt.show()


In [None]:

# BiLSTM+ANN Accuracy and Loss
fig, ax = plt.subplots(1, 2)

ax[0].plot(epochs, history_bilstm_ann.history['loss'], label='Training Loss')
ax[0].plot(epochs, history_bilstm_ann.history['val_loss'], label='Testing Loss')
ax[0].set_title('BiLSTM+ANN Loss')
ax[0].legend()
ax[0].set_xlabel("Epochs")
ax[0].set_ylim(0, 1)

ax[1].plot(epochs, history_bilstm_ann.history['accuracy'], label='Training Accuracy')
ax[1].plot(epochs, history_bilstm_ann.history['val_accuracy'], label='Testing Accuracy')
ax[1].set_title('BiLSTM+ANN Accuracy')
ax[1].legend()
ax[1].set_xlabel("Epochs")
ax[1].set_ylim(0, 1)

plt.show()

# Confusion Matrix and Final Evaluation
lstm_predictions = lstm_ann_model.predict(X_test)
bilstm_predictions = bilstm_ann_model.predict(X_test)

lstm_pred_labels = np.argmax(lstm_predictions, axis=1)
bilstm_pred_labels = np.argmax(bilstm_predictions, axis=1)

y_test_labels = np.argmax(y_test, axis=1)

# Confusion Matrix for LSTM+ANN and BiLSTM+ANN
conf_matrix_lstm_ann = confusion_matrix(y_test_labels, lstm_pred_labels)
conf_matrix_bilstm_ann = confusion_matrix(y_test_labels, bilstm_pred_labels)

fig, ax = plt.subplots(1, 2, figsize=(12, 6))

sns.heatmap(conf_matrix_lstm_ann, annot=True, fmt='d', cmap='Blues', ax=ax[0], xticklabels=class_labels, yticklabels=class_labels)
ax[0].set_title('Confusion Matrix - LSTM+ANN')

sns.heatmap(conf_matrix_bilstm_ann, annot=True, fmt='d', cmap='Blues', ax=ax[1], xticklabels=class_labels, yticklabels=class_labels)
ax[1].set_title('Confusion Matrix - BiLSTM+ANN')

plt.show()
