<a href="https://colab.research.google.com/github/NainaMKumar/Arrythmia_detection/blob/main/Arrythmia_Detection_DL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install wfdb

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import scipy
import wfdb
import os
from pathlib import Path
from scipy import signal
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import MultiLabelBinarizer
from imblearn.over_sampling import RandomOverSampler

import tensorflow as tf
from tensorflow import keras
from keras import layers
from keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from sklearn.utils import class_weight
from keras import regularizers
from keras.preprocessing.image import ImageDataGenerator
from keras.optimizers import Adam
from keras.metrics import Precision
from keras.metrics import Recall
from keras.metrics import AUC

In [None]:
dataset_directory = Path('/content/mit-bih-arrhythmia-database-1.0.0.zip')

In [None]:
from zipfile import ZipFile

with ZipFile(dataset_directory, 'r') as zip:
  zip.extractall()
  print('Done')

In [None]:
dataset_directory = Path('/content/mit-bih-arrhythmia-database-1.0.0')

In [None]:
# Demo 1 - Read a WFDB record using the 'rdrecord' function into a wfdb.Record object.
# Plot the signals, and show the data.
record = wfdb.rdrecord(dataset_directory / '100')
wfdb.plot_wfdb(record=record, title='Record 100 from MIT BIH Arrythmia Database')
display(record.__dict__)

In [None]:
# Demo 2 - Read certain channels and sections of the WFDB record using the simplified 'rdsamp' function
# which returns a numpy array and a dictionary. Show the data.
signals, fields = wfdb.rdsamp(dataset_directory / '100', channels=[0, 1], sampfrom=100, sampto=15000)
display(signals)
display(fields)

In [None]:
# Extract signal data
signal_1 = signals[:, 0]
signal_2 = signals[:, 1]

# Extract sampling frequency and number of samples
fs = fields['fs']
n_samples = fields['sig_len']

# Calculate time array
time = np.arange(n_samples) / fs

# Create plot
plt.figure(figsize=(10, 6))
plt.plot(time, signal_1, label='Signal 1')
plt.plot(time, signal_2, label='Signal 2')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.title('Signal Plot')
plt.legend()
plt.show()

In [None]:
ecg_signals = []

for file in sorted(dataset_directory.glob('*.dat')):
    print(file.stem)
    signals, fields = wfdb.rdsamp(dataset_directory / file.stem, channels=[1], sampfrom=100, sampto=15000)
    print(signals)
    ecg_signals.append(signals)

ecg_signals = np.array(ecg_signals)
print(ecg_signals.shape)

In [None]:
def plot_signal(ecg_signal):

    # Extract sampling frequency and number of samples
    fs = fields['fs']
    signal_length = len(ecg_signal)

    # Calculate time array
    time = np.arange(0, signal_length) / fs

    # Create plot
    plt.figure(figsize=(10, 6))
    plt.plot(time, ecg_signal)
    plt.xlabel('Time (s)')
    plt.ylabel('Amplitude')

    plt.title('Signal Plot')
    plt.show()

In [None]:
def scale_signal(ecg_signal):
    # Find the minimum value in the signal
    min_val = np.min(ecg_signal)

    # Calculate the scaling factor to shift the signal above 0
    scaling_factor = abs(min_val) + 1  # Add 1 for a small margin

    # Scale the signal by adding the scaling factor
    scaled_signal = ecg_signal + scaling_factor

    return scaled_signal

In [None]:
ecg_signal = ecg_signals[0]
ecg_signal = np.array(ecg_signal)

scaled_ecg_signal = scale_signal(ecg_signal)
scaled_ecg_signal = np.array(scaled_ecg_signal)

In [None]:
def signal_filter_powerline(signal, sampling_rate, powerline=50):
    """Filter out 50 Hz powerline noise by smoothing the signal with a moving average kernel with the width of one
    period of 50Hz."""

    if sampling_rate >= 100:
        b = np.ones(int(sampling_rate / powerline))
    else:
        b = np.ones(2)
    a = [len(b)]
    y = scipy.signal.filtfilt(b, a, signal, axis = 0)
    return y

In [None]:
plot_signal(ecg_signal)

In [None]:
denoised_signal = signal_filter_powerline(ecg_signal, fields['fs'])
plot_signal(denoised_signal)

In [None]:
def remove_baseline_wander(ecg_signal, sampling_frequency, cutoff_frequency = 1.2, filter_order=3):
    nyquist_frequency = 0.5 * sampling_frequency
    normalized_cutoff = cutoff_frequency / nyquist_frequency

    # Design a high-pass Butterworth filter
    b, a = signal.butter(filter_order, normalized_cutoff, btype='high', analog=False)

    # Apply the high-pass filter to remove baseline wander
    baseline_removed_signal = signal.filtfilt(b, a, ecg_signal, axis = 0)

    return baseline_removed_signal

In [None]:
baseline_removed_signal = remove_baseline_wander(denoised_signal, fields['fs'])

plot_signal(ecg_signal)
plot_signal(baseline_removed_signal)

In [None]:
def normalize_signal(signal):
    norm_signal = (signal - np.min(signal)) / (np.max(signal) - np.min(signal))
    return norm_signal
    # scaler = MinMaxScaler()
    # signal = signal.reshape(-1, 1)
    # scaled_signal = scaler.fit_transform(signal)
    # return scaled_signal

In [None]:
normalized_signal = normalize_signal(baseline_removed_signal)
plot_signal(normalized_signal)

print(np.min(baseline_removed_signal))
print(np.max(baseline_removed_signal))
print(np.min(normalized_signal))
print(np.max(normalized_signal))


In [None]:
X = []

for ecg_signal in ecg_signals:
    denoised_signal = signal_filter_powerline(ecg_signal, fields['fs'])
    filtered_signal = remove_baseline_wander(denoised_signal, fields['fs'])
    normalized_signal = tf.keras.utils.normalize(np.array(filtered_signal))
    X.append(normalized_signal)

X = np.array(X)
# print(X)

In [None]:
y = []

for file in dataset_directory.glob('*.atr'):
    print(file.stem)
    annotations = wfdb.rdann(str(dataset_directory / file.stem), 'atr', sampto=15000)
    diagnosis = annotations.aux_note
    y.append(diagnosis)

print(y)
print(len(y))

In [None]:
y = [[item.strip().replace('\x00', '') for item in inner_list if item.strip()] for inner_list in y]

y = [[item.replace('(', '') for item in inner_list] for inner_list in y]
print(y)
print(len(y))

In [None]:
y_dict = {'N': 0, 'P': 1, 'B': 2, 'AFIB': 3, 'MISSB': 4, 'PREX': 5, 'VT': 6, 'VFL': 7, 'SBR': 8}
y = [[y_dict.get(k) for k in lst] for lst in y]
print(y)

In [None]:
mlb = MultiLabelBinarizer()
y_transformed = mlb.fit_transform(y)
print(y_transformed)
y_transformed= np.array(y_transformed)

num_classes = mlb.classes_

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y_transformed, test_size = 0.3, random_state = 42)

In [None]:
#data augmentation
noise_mean = 0
noise_std = 0.1


X_train_reshaped = np.expand_dims(X_train, axis=-1)

datagen = ImageDataGenerator(
    preprocessing_function=lambda x: x + np.random.normal(loc=noise_mean, scale=noise_std, size=x.shape)
)

generator = datagen.flow(X_train_reshaped, y_train, batch_size = 500, shuffle=True)
augmented_samples = next(generator)

augmented_signals = augmented_samples[0]
augmented_labels = augmented_samples[1]

# print(augmented_signals.shape)
augmented_signals = np.squeeze(augmented_signals, axis = (3))

X_train = np.concatenate((X_train, augmented_signals), axis=0)
y_train = np.concatenate((y_train, augmented_labels), axis=0)

print(X_train.shape)
print(y_train.shape)

In [None]:
print(y_train.sum(axis = 0))

In [None]:
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)

In [None]:
#compute class weights

reverse_dict = {v: k for k, v in y_dict.items()}
class_frequencies = np.sum(y_transformed, axis=0)
print(class_frequencies)
num_classes = len(y_dict)

class_weights = {reverse_dict[i]: len(y_transformed) / (num_classes * class_frequencies[i]) for i in range(num_classes)}
class_weights = {y_dict[label]: class_weights[label] for label in class_weights}
print(class_weights)

In [None]:
model = keras.Sequential()
model.add(layers.Conv1D(filters = 32, kernel_size = 5, activation = 'relu'))
model.add(layers.MaxPooling1D(pool_size = 2))
model.add(layers.Conv1D(filters = 32, kernel_size = 5, activation = 'relu'))
model.add(layers.MaxPooling1D(pool_size = 2))
model.add(layers.Bidirectional(layers.LSTM(32)))
model.add(layers.Dense(100, activation = 'relu', kernel_regularizer = regularizers.l2(0.0003)))
model.add(layers.Dense(num_classes, activation = 'softmax', kernel_regularizer = regularizers.l2(0.0003)))


In [None]:
learning_rate = 0.0001
optimizer = Adam(learning_rate=learning_rate)

In [None]:
batch_size = 32

model.compile(
    loss= 'categorical_crossentropy',
    optimizer=optimizer,
    metrics = ['accuracy', Precision(), Recall(), AUC(curve = 'ROC')]
)

history = model.fit(
    X_train,
    y_train,
    epochs= 16,
    batch_size=batch_size,
    shuffle=False,
    validation_data = (X_test, y_test),
    class_weight = class_weights,
)

In [None]:
y_pred = model.predict(X_test)
print(y_pred)

In [None]:
np.argmax(y_pred, axis = 1)
print(y_test)

In [None]:
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
plt.plot(history.history['auc'])
plt.plot(history.history['val_auc'])
plt.title('model auc')
plt.ylabel('auc')
plt.xlabel('epoch')
plt.legend(['train', 'val'], loc='upper left')
plt.show()

In [None]:
loss, accuracy, precision, recall, auc = model.evaluate(X_test, y_test, batch_size = 64)