In [4]:
import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.io import arff
from sklearn.model_selection import train_test_split
import matplotlib
matplotlib.rcParams["figure.figsize"] = (6, 4)
plt.style.use("ggplot")
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import mae
from tensorflow.keras import layers
from tensorflow import keras
from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report
import os

In [5]:
from sklearn.preprocessing import MinMaxScaler
import plotly.graph_objs as go
import plotly.express as px
from scipy.signal import medfilt, butter, filtfilt
import pywt
from sklearn.model_selection import train_test_split
import scipy.signal
from keras.models import Sequential
from keras.layers import LSTM, Dense, Reshape
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
from sklearn.metrics import roc_auc_score

# NB: in this work the appropriete metric is Recall : ensure that all anomalies are detected

In [None]:
normal_df = pd.read_csv("/content/ptbdb_normal.csv")
anomaly_df = pd.read_csv("/content/ptbdb_abnormal.csv")
normal_df.head()

In [None]:
print("Shape of Normal data", normal_df.shape)
print("Shape of Abnormal data", anomaly_df.shape)

In [None]:
CLASS_NAMES = ["Normal", "Anomaly"]

normal_df_copy = normal_df.copy()
anomaly_df_copy = anomaly_df.copy()
print(anomaly_df_copy.columns.equals(normal_df_copy.columns))

In [58]:
normal_df_copy = normal_df_copy.set_axis(range(1, 189), axis=1)
anomaly_df_copy = anomaly_df_copy.set_axis(range(1, 189), axis=1)
normal_df_copy = normal_df_copy.assign(target = CLASS_NAMES[0])
anomaly_df_copy = anomaly_df_copy.assign(target = CLASS_NAMES[1])


df = pd.concat((normal_df_copy, anomaly_df_copy))

In [None]:
df.info()

In [59]:
# Extract the first 5 lines from each dataset this is eserved for the inference
presentation_normal_data = normal_df.head(5)
presentation_anomaly_data = anomaly_df.head(5)

normal_df = normal_df.drop(normal_df.index[:5])
anomaly_df = anomaly_df.drop(anomaly_df.index[:5])

In [None]:
normal_df.drop("target", axis=1, errors="ignore", inplace=True)
normal = normal_df.to_numpy()
anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
anomaly = anomaly_df.to_numpy()

X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")

In [15]:
tf.keras.utils.set_random_seed(1024)

In [61]:
from keras.models import Sequential
from keras.layers import Dense, Conv1D, Flatten, Dropout, MaxPooling1D, BatchNormalization
from keras import regularizers

In [19]:
# Define the grid search parameters
optimizers = ['SGD', 'RMSprop', 'Adagrad', 'Adadelta', 'Adam', 'Adamax', 'Nadam']

# 2 methode

In [None]:
df.head()

In [None]:
unique_values = df['target'].unique()
unique_values

In [None]:
value_counts = df['target'].value_counts()
value_counts

In [None]:
plt.bar(value_counts.index, value_counts.values)
plt.xlabel("Target Value")
plt.ylabel("Count")
plt.title("Distribution of Target Values")
plt.show()

In [66]:
# Define a mapping dictionary
mapping = {'Normal': 1, 'Anomaly': 0}

# Apply the mapping to the 'target' column
df['target_numeric'] = df['target'].map(mapping)

In [None]:
df.head()

In [68]:
df.drop('target', axis=1, inplace=True)

In [None]:
df.head()

In [None]:
df.shape

In [None]:
df.isna().sum().sum()

In [None]:
abnormal = df[df['target_numeric'] ==0][:10]
normal = df[df['target_numeric'] ==1][:10]

fig = go.Figure()
leg  = [False] * abnormal.shape[0]
leg[0] = True

for i in range(abnormal.shape[0]):
    fig.add_trace(go.Scatter( x=np.arange(abnormal.shape[1]),y=abnormal.iloc[i,:],name='Abnormal ECG', mode='lines',  marker_color='rgba(255, 0, 0, 0.9)', showlegend= leg[i]))

for j in range(normal.shape[0]):
    fig.add_trace(go.Scatter( x=np.arange(normal.shape[1]),y=normal.iloc[j,:],name='Normal ECG',  mode='lines',  marker_color='rgba(0, 255, 0, 1)', showlegend= leg[j]))



fig.update_layout(xaxis_title="time", yaxis_title="Signal", title= {'text': 'Difference between different ECG', 'xanchor': 'center', 'yanchor': 'top', 'x':0.5} , bargap=0,)
fig.update_traces(opacity=0.5)
fig.show()

# Data preprocessing

# **Noise filtring**

In [73]:
ecg_data = df.iloc[:,:-1]
labels = df.iloc[:,-1]

In [74]:
scaler = MinMaxScaler(feature_range=(-1, 1))
ecg_data = scaler.fit_transform(ecg_data)

In [None]:
labels

# Plot the graphs of unfiltered and filtered signals

**Plot original ECG signal**

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=np.arange(ecg_data.shape[0]), y=ecg_data[30], mode='lines', name='Original ECG signal'))

**Filtering techniques**

In [77]:
# Median filtering
ecg_medfilt = medfilt(ecg_data, kernel_size=3)

# Low-pass filtering
lowcut = 0.05
highcut = 20.0
nyquist = 0.5 * 360.0
low = lowcut / nyquist
high = highcut / nyquist
b, a = butter(4, [low, high], btype='band')
ecg_lowpass = filtfilt(b, a, ecg_data)

# Wavelet filtering
coeffs = pywt.wavedec(ecg_data, 'db4', level=1)
threshold = np.std(coeffs[-1]) * np.sqrt(2*np.log(len(ecg_data)))
coeffs[1:] = (pywt.threshold(i, value=threshold, mode='soft') for i in coeffs[1:])
ecg_wavelet = pywt.waverec(coeffs, 'db4')

**Plot filtered ECG signals**

In [None]:
fig.add_trace(go.Scatter(x=np.arange(ecg_medfilt.shape[0]), y=ecg_medfilt[30], mode='lines', name='Median filtered ECG signal'))
fig.add_trace(go.Scatter(x=np.arange(ecg_lowpass.shape[0]), y=ecg_lowpass[30], mode='lines', name='Low-pass filtered ECG signal'))
fig.add_trace(go.Scatter(x=np.arange(ecg_wavelet.shape[0]), y=ecg_wavelet[30], mode='lines', name='Wavelet filtered ECG signal'))
fig.show()

# Choosing the best filtering technique

## Note :
#### We will evaluate the filtering methods by calculating each algorithm’s mean squared error (MSE). The algorithm with the lowest MSE will be selected. To calculate MSE, both signals’ dataframe must have an equal number of rows. If the dataset is reduced after filtering, this step involves padding the data with zeros.

In [79]:
def pad_data(original_data,filtered_data):
  diff = original_data.shape[1] - filtered_data.shape[1]

  if diff > 0:

      padding = np.zeros((filtered_data.shape[0], original_data.shape[1]))
      padded_data = np.concatenate((filtered_data, padding))

  elif diff < 0:
      padded_data = filtered_data[:,:-abs(diff)]

  elif diff == 0:
      padded_data = filtered_data

  return padded_data

def mse(original_data, filtered_data):
    filter_data = pad_data(original_data,filtered_data)
    return np.mean((original_data - filter_data) ** 2)

**Calculate the MSE**

In [None]:
mse_value_m = mse(ecg_data, ecg_medfilt)
mse_value_l = mse(ecg_data, ecg_lowpass)
mse_value_w = mse(ecg_data, ecg_wavelet)
print("MSE value of Median Filtering:", mse_value_m)
print("MSE value of Low-pass Filtering:", mse_value_l)
print("MSE value of Wavelet Filtering:", mse_value_w)

# Splitting Data into Train & Test Set

In [81]:
X_train, X_test, y_train, y_test = train_test_split(ecg_wavelet, labels, test_size=0.2, random_state=42)

# Feature engineering: Choosing relevant features

In [82]:
features = []

for i in range(X_train.shape[0]):
    #Finding the R-peaks
    r_peaks = scipy.signal.find_peaks(X_train[i])[0]

    #Initialize lists to hold R-peak and T-peak amplitudes
    r_amplitudes = []
    t_amplitudes = []

    # Iterate through R-peak locations to find corresponding T-peak amplitudes
    for r_peak in r_peaks:
        # Find the index of the T-peak (minimum value) in the interval from R-peak to R-peak + 200 samples
        t_peak = np.argmin(X_train[i][r_peak:r_peak+200]) + r_peak
        #Append the R-peak amplitude and T-peak amplitude to the lists
        r_amplitudes.append(X_train[i][r_peak])
        t_amplitudes.append(X_train[i][t_peak])

    # extracting singular value metrics from the r_amplitudes
    std_r_amp = np.std(r_amplitudes)
    mean_r_amp = np.mean(r_amplitudes)
    median_r_amp = np.median(r_amplitudes)
    sum_r_amp = np.sum(r_amplitudes)
    # extracting singular value metrics from the t_amplitudes
    std_t_amp = np.std(t_amplitudes)
    mean_t_amp = np.mean(t_amplitudes)
    median_t_amp = np.median(t_amplitudes)
    sum_t_amp = np.sum(t_amplitudes)

    # Find the time between consecutive R-peaks
    rr_intervals = np.diff(r_peaks)

    # Calculate the time duration of the data collection
    time_duration = (len(X_train[i]) - 1) / 1000 # assuming data is in ms

    # Calculate the sampling rate
    sampling_rate = len(X_train[i]) / time_duration

    # Calculate heart rate
    duration = len(X_train[i]) / sampling_rate
    heart_rate = (len(r_peaks) / duration) * 60

    # QRS duration
    qrs_duration = []
    for j in range(len(r_peaks)):
        qrs_duration.append(r_peaks[j]-r_peaks[j-1])
    # extracting singular value metrics from the qrs_durations
    std_qrs = np.std(qrs_duration)
    mean_qrs = np.mean(qrs_duration)
    median_qrs = np.median(qrs_duration)
    sum_qrs = np.sum(qrs_duration)

    # Extracting the singular value metrics from the RR-interval
    std_rr = np.std(rr_intervals)
    mean_rr = np.mean(rr_intervals)
    median_rr = np.median(rr_intervals)
    sum_rr = np.sum(rr_intervals)

    # Extracting the overall standard deviation
    std = np.std(X_train[i])

    # Extracting the overall mean
    mean = np.mean(X_train[i])

    # Appending the features to the list
    features.append([mean, std, std_qrs, mean_qrs,median_qrs, sum_qrs, std_r_amp, mean_r_amp, median_r_amp, sum_r_amp, std_t_amp, mean_t_amp, median_t_amp, sum_t_amp, sum_rr, std_rr, mean_rr,median_rr, heart_rate])

# Converting the list to a numpy array
features = np.array(features)

In [None]:
pd.DataFrame(features)

In [None]:
X_train.shape

In [85]:
# Initializing an empty list to store the features
X_test_fe = []

# Extracting features for each sample
for i in range(X_test.shape[0]):
    # Finding the R-peaks
    r_peaks = scipy.signal.find_peaks(X_test[i])[0]

    # Initialize lists to hold R-peak and T-peak amplitudes
    r_amplitudes = []
    t_amplitudes = []

    # Iterate through R-peak locations to find corresponding T-peak amplitudes
    for r_peak in r_peaks:
        # Find the index of the T-peak (minimum value) in the interval from R-peak to R-peak + 200 samples
        t_peak = np.argmin(X_test[i][r_peak:r_peak+200]) + r_peak
        # Append the R-peak amplitude and T-peak amplitude to the lists
        r_amplitudes.append(X_test[i][r_peak])
        t_amplitudes.append(X_test[i][t_peak])
    #extracting singular value metrics from the r_amplitudes
    std_r_amp = np.std(r_amplitudes)
    mean_r_amp = np.mean(r_amplitudes)
    median_r_amp = np.median(r_amplitudes)
    sum_r_amp = np.sum(r_amplitudes)
    #extracting singular value metrics from the t_amplitudes
    std_t_amp = np.std(t_amplitudes)
    mean_t_amp = np.mean(t_amplitudes)
    median_t_amp = np.median(t_amplitudes)
    sum_t_amp = np.sum(t_amplitudes)

    # Find the time between consecutive R-peaks
    rr_intervals = np.diff(r_peaks)

    # Calculate the time duration of the data collection
    time_duration = (len(X_test[i]) - 1) / 1000 # assuming data is in ms

    # Calculate the sampling rate
    sampling_rate = len(X_test[i]) / time_duration

    # Calculate heart rate
    duration = len(X_test[i]) / sampling_rate
    heart_rate = (len(r_peaks) / duration) * 60

    # QRS duration
    qrs_duration = []
    for j in range(len(r_peaks)):
        qrs_duration.append(r_peaks[j]-r_peaks[j-1])
    #extracting singular value metrics from the qrs_duartions
    std_qrs = np.std(qrs_duration)
    mean_qrs = np.mean(qrs_duration)
    median_qrs = np.median(qrs_duration)
    sum_qrs = np.sum(qrs_duration)

    # Extracting the standard deviation of the RR-interval
    std_rr = np.std(rr_intervals)
    mean_rr = np.mean(rr_intervals)
    median_rr = np.median(rr_intervals)
    sum_rr = np.sum(rr_intervals)

      # Extracting the standard deviation of the RR-interval
    std = np.std(X_test[i])

    # Extracting the mean of the RR-interval
    mean = np.mean(X_test[i])

    # Appending the features to the list
    X_test_fe.append([mean, std,  std_qrs, mean_qrs,median_qrs, sum_qrs, std_r_amp, mean_r_amp, median_r_amp, sum_r_amp, std_t_amp, mean_t_amp, median_t_amp, sum_t_amp, sum_rr, std_rr, mean_rr,median_rr,heart_rate])

# Converting the list to a numpy array
X_test_fe = np.array(X_test_fe)

In [None]:
X_test_fe.shape

# Model Building and Training

In [None]:
# Define the number of features in the train dataframe
num_features = features.shape[1]

# Reshape the features data to be in the right shape for LSTM input
features = np.asarray(features).astype('float32')
features = features.reshape(features.shape[0], features.shape[1], 1)

X_test_fe = np.asarray(X_test_fe).astype('float32')
X_test_fe = X_test_fe.reshape(X_test_fe.shape[0], X_test_fe.shape[1], 1)

# Define the model architecture
model = Sequential()
model.add(LSTM(64, input_shape=(features.shape[1], 1)))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Time the training process
start_time = time.time()

# Train the model
history = model.fit(features, y_train, validation_data=(X_test_fe, y_test), epochs=50, batch_size=32, verbose=1)

# Calculate the training time
training_time = time.time() - start_time
print("Training Time:", training_time, "seconds")

# Plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot training and validation accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Make predictions on the validation set
y_pred = model.predict(X_test_fe)

# Convert the predicted values to binary labels
y_pred = [1 if p > 0.5 else 0 for p in y_pred]


In [None]:
model.summary()

# Evaluation of the Model

In [None]:
# calculate the accuracy
acc = accuracy_score(y_test, y_pred)

#calculate the AUC score
auc = round(roc_auc_score(y_test, y_pred),2)

#classification report provides all metrics e.g. precision, recall, etc.
all_met = classification_report(y_test, y_pred)

# Print the accuracy
print("Accuracy: ", acc*100, "%")
print(" \n")
print("AUC:", auc)
print(" \n")
print("Classification Report: \n", all_met)
print(" \n")


# Calculate the confusion matrix
conf_mat = confusion_matrix(y_test, y_pred)
conf_mat_df = pd.DataFrame(conf_mat, columns=['Predicted Negative', 'Predicted Positive'], index=['Actual Negative', 'Actual Positive'])

fig = px.imshow(conf_mat_df, text_auto= True, color_continuous_scale='Blues')
fig.update_xaxes(side='top', title_text='Predicted')
fig.update_yaxes(title_text='Actual')
fig.show()

In [None]:
# Plot training and validation error
fig = go.Figure()
fig.add_trace(go.Scatter( y=history.history['loss'], mode='lines', name='Training'))
fig.add_trace(go.Scatter( y=history.history['val_loss'], mode='lines', name='Validation'))
fig.update_layout(xaxis_title="Epoch", yaxis_title="Error", title= {'text': 'Model Error', 'xanchor': 'center', 'yanchor': 'top', 'x':0.5} , bargap=0)
fig.show()

# using bilstm

In [None]:
from keras.layers import LSTM, Dense, Bidirectional
from keras.models import Sequential

# Define the number of features in the train dataframe
num_features = features.shape[1]

# Reshape the features data to be in the right shape for LSTM input
features = np.asarray(features).astype('float32')
features = features.reshape(features.shape[0], features.shape[1], 1)

X_test_fe = np.asarray(X_test_fe).astype('float32')
X_test_fe = X_test_fe.reshape(X_test_fe.shape[0], X_test_fe.shape[1], 1)

# Define the model architecture
model = Sequential()
model.add(Bidirectional(LSTM(64), input_shape=(features.shape[1], 1)))
model.add(Dense(1, activation='sigmoid'))

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(features, y_train, validation_data=(X_test_fe, y_test), epochs=50, batch_size=32)

# Calculate the training time
training_time = time.time() - start_time
print("Training Time:", training_time, "seconds")

# Plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot training and validation accuracy
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Make predictions on the validation set
y_pred = model.predict(X_test_fe)

# Convert the predicted values to binary labels
y_pred = [1 if p > 0.5 else 0 for p in y_pred]

In [None]:
model.summary()

# Model evaluation

In [None]:
# calculate the accuracy
acc = accuracy_score(y_test, y_pred)

#calculate the AUC score
auc = round(roc_auc_score(y_test, y_pred),2)

#classification report provides all metrics e.g. precision, recall, etc.
all_met = classification_report(y_test, y_pred)

# Print the accuracy
print("Accuracy: ", acc*100, "%")
print(" \n")
print("AUC:", auc)
print(" \n")
print("Classification Report: \n", all_met)
print(" \n")


# Calculate the confusion matrix
conf_mat = confusion_matrix(y_test, y_pred)
conf_mat_df = pd.DataFrame(conf_mat, columns=['Predicted Negative', 'Predicted Positive'], index=['Actual Negative', 'Actual Positive'])

fig = px.imshow(conf_mat_df, text_auto= True, color_continuous_scale='Blues')
fig.update_xaxes(side='top', title_text='Predicted')
fig.update_yaxes(title_text='Actual')
fig.show()

In [None]:
!pip install keras

In [None]:
# Plot training and validation error
fig = go.Figure()
fig.add_trace(go.Scatter( y=history.history['loss'], mode='lines', name='Training'))
fig.add_trace(go.Scatter( y=history.history['val_loss'], mode='lines', name='Validation'))
fig.update_layout(xaxis_title="Epoch", yaxis_title="Error", title= {'text': 'Model Error', 'xanchor': 'center', 'yanchor': 'top', 'x':0.5} , bargap=0)
fig.show()

# stack lstm , GRU, ConvLSTM

In [None]:
from keras.layers import LSTM, Dense, Bidirectional, TimeDistributed, Conv1D, MaxPooling1D, Flatten, Dropout, ConvLSTM2D, GRU
from keras.models import Sequential

# Define the number of features in the train dataframe
num_features = features.shape[1]

# Reshape the features data to be in the right shape for LSTM input
features = np.asarray(features).astype('float32')
features = features.reshape(features.shape[0], features.shape[1], 1)

X_test_fe = np.asarray(X_test_fe).astype('float32')
X_test_fe = X_test_fe.reshape(X_test_fe.shape[0], X_test_fe.shape[1], 1)

# Stacked LSTM
stacked_lstm_model = Sequential()
stacked_lstm_model.add(LSTM(64, return_sequences=True, input_shape=(features.shape[1], 1)))
stacked_lstm_model.add(LSTM(64))
stacked_lstm_model.add(Dense(1, activation='sigmoid'))

# GRU
gru_model = Sequential()
gru_model.add(GRU(64, input_shape=(features.shape[1], 1)))
gru_model.add(Dense(1, activation='sigmoid'))

# CNN-LSTM
cnn_lstm_model = Sequential()
cnn_lstm_model.add(Conv1D(filters=64, kernel_size=3, activation='relu', input_shape=(features.shape[1], 1)))
cnn_lstm_model.add(MaxPooling1D(pool_size=2))
cnn_lstm_model.add(LSTM(64))
cnn_lstm_model.add(Dense(1, activation='sigmoid'))



# Compile the models
stacked_lstm_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
gru_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
cnn_lstm_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])


# Train the models
print('stacked_lstm : ')
stacked_lstm_history = stacked_lstm_model.fit(features, y_train, validation_data=(X_test_fe, y_test), epochs=50, batch_size=32)
print('gru : ')
gru_history = gru_model.fit(features, y_train, validation_data=(X_test_fe, y_test), epochs=50, batch_size=32)
print('cnn_lstm : ')
cnn_lstm_history = cnn_lstm_model.fit(features, y_train, validation_data=(X_test_fe, y_test), epochs=50, batch_size=32)







# Plot Stacked LSTM accuracy
plt.plot(stacked_lstm_history.history['accuracy'], label='Stacked LSTM Training Accuracy', color='blue')
plt.plot(stacked_lstm_history.history['val_accuracy'], label='Stacked LSTM Validation Accuracy', color='skyblue')
plt.title('Stacked LSTM Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Plot Stacked LSTM loss
plt.plot(stacked_lstm_history.history['loss'], label='Stacked LSTM Training Loss', color='red')
plt.plot(stacked_lstm_history.history['val_loss'], label='Stacked LSTM Validation Loss', color='orange')
plt.title('Stacked LSTM Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot GRU accuracy
plt.plot(gru_history.history['accuracy'], label='GRU Training Accuracy', color='green')
plt.plot(gru_history.history['val_accuracy'], label='GRU Validation Accuracy', color='lightgreen')
plt.title('GRU Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Plot GRU loss
plt.plot(gru_history.history['loss'], label='GRU Training Loss', color='purple')
plt.plot(gru_history.history['val_loss'], label='GRU Validation Loss', color='violet')
plt.title('GRU Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

# Plot CNN-LSTM accuracy
plt.plot(cnn_lstm_history.history['accuracy'], label='CNN-LSTM Training Accuracy', color='cyan')
plt.plot(cnn_lstm_history.history['val_accuracy'], label='CNN-LSTM Validation Accuracy', color='lightblue')
plt.title('CNN-LSTM Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

# Plot CNN-LSTM loss
plt.plot(cnn_lstm_history.history['loss'], label='CNN-LSTM Training Loss', color='magenta')
plt.plot(cnn_lstm_history.history['val_loss'], label='CNN-LSTM Validation Loss', color='pink')
plt.title('CNN-LSTM Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

y_pred =stacked_lstm_model.predict(X_test_fe)
y_pred_stack = [1 if p > 0.5 else 0 for p in y_pred_stack]

y_pred =gru_model.predict(X_test_fe)
y_pred_gru_model = [1 if p > 0.5 else 0 for p in y_pred_gru_model]

y_pred =cnn_lstm_model.predict(X_test_fe)
y_pred_cnn_lstm = [1 if p > 0.5 else 0 for p in y_pred_cnn_lstm]

In [None]:
stacked_lstm_model.summary()

In [None]:
gru_model.summary()

In [None]:
cnn_lstm_model.summary()

# Models evaluation

# Stack Lstm

In [None]:
# Make predictions on the validation set
y_pred_stakLstm = stacked_lstm_model.predict(X_test_fe)
# Convert the predicted values to binary labels
y_pred_stakLstm = [1 if p > 0.5 else 0 for p in y_pred_stakLstm]


# calculate the accuracy
acc = accuracy_score(y_test, y_pred_stakLstm)

#calculate the AUC score
auc = round(roc_auc_score(y_test, y_pred_stakLstm),2)

#classification report provides all metrics e.g. precision, recall, etc.
all_met = classification_report(y_test, y_pred_stakLstm)

# Print the accuracy
print("Accuracy: ", acc*100, "%")
print(" \n")
print("AUC:", auc)
print(" \n")
print("Classification Report: \n", all_met)
print(" \n")


# Calculate the confusion matrix
conf_mat = confusion_matrix(y_test, y_pred_stakLstm)
conf_mat_df = pd.DataFrame(conf_mat, columns=['Predicted Negative', 'Predicted Positive'], index=['Actual Negative', 'Actual Positive'])

fig = px.imshow(conf_mat_df, text_auto= True, color_continuous_scale='Blues')
fig.update_xaxes(side='top', title_text='Predicted')
fig.update_yaxes(title_text='Actual')
fig.show()

In [None]:
# Plot training and validation error
fig = go.Figure()
fig.add_trace(go.Scatter( y=stacked_lstm_history.history['loss'], mode='lines', name='Training'))
fig.add_trace(go.Scatter( y=stacked_lstm_history.history['val_loss'], mode='lines', name='Validation'))
fig.update_layout(xaxis_title="Epoch", yaxis_title="Error", title= {'text': 'Model Error', 'xanchor': 'center', 'yanchor': 'top', 'x':0.5} , bargap=0)
fig.show()

# GRU

In [None]:
# Make predictions on the validation set
y_pred_gru_history = gru_model.predict(X_test_fe)
# Convert the predicted values to binary labels
y_pred_gru_history = [1 if p > 0.5 else 0 for p in y_pred_gru_history]


In [None]:
acc = accuracy_score(y_test, y_pred_gru_history)

#calculate the AUC score
auc = round(roc_auc_score(y_test, y_pred_gru_history),2)

#classification report provides all metrics e.g. precision, recall, etc.
all_met = classification_report(y_test, y_pred_gru_history)

# Print the accuracy
print("Accuracy: ", acc*100, "%")
print(" \n")
print("AUC:", auc)
print(" \n")
print("Classification Report: \n", all_met)
print(" \n")


# Calculate the confusion matrix
conf_mat = confusion_matrix(y_test, y_pred_gru_history)
conf_mat_df = pd.DataFrame(conf_mat, columns=['Predicted Negative', 'Predicted Positive'], index=['Actual Negative', 'Actual Positive'])

fig = px.imshow(conf_mat_df, text_auto= True, color_continuous_scale='Blues')
fig.update_xaxes(side='top', title_text='Predicted')
fig.update_yaxes(title_text='Actual')
fig.show()

In [None]:
# Plot training and validation error
fig = go.Figure()
fig.add_trace(go.Scatter( y=gru_history.history['loss'], mode='lines', name='Training'))
fig.add_trace(go.Scatter( y=gru_history.history['val_loss'], mode='lines', name='Validation'))
fig.update_layout(xaxis_title="Epoch", yaxis_title="Error", title= {'text': 'Model Error', 'xanchor': 'center', 'yanchor': 'top', 'x':0.5} , bargap=0)
fig.show()

# ConvLSTM

In [None]:
# Make predictions on the validation set
y_pred_cnnlstm = cnn_lstm_model.predict(X_test_fe)
# Convert the predicted values to binary labels
y_pred_cnnlstm = [1 if p > 0.5 else 0 for p in y_pred_gru_history]

In [None]:
acc = accuracy_score(y_test, y_pred_cnnlstm)

#calculate the AUC score
auc = round(roc_auc_score(y_test, y_pred_cnnlstm),2)

#classification report provides all metrics e.g. precision, recall, etc.
all_met = classification_report(y_test, y_pred_cnnlstm)

# Print the accuracy
print("Accuracy: ", acc*100, "%")
print(" \n")
print("AUC:", auc)
print(" \n")
print("Classification Report: \n", all_met)
print(" \n")


# Calculate the confusion matrix
conf_mat = confusion_matrix(y_test, y_pred_cnnlstm)
conf_mat_df = pd.DataFrame(conf_mat, columns=['Predicted Negative', 'Predicted Positive'], index=['Actual Negative', 'Actual Positive'])

fig = px.imshow(conf_mat_df, text_auto= True, color_continuous_scale='Blues')
fig.update_xaxes(side='top', title_text='Predicted')
fig.update_yaxes(title_text='Actual')
fig.show()

In [None]:
# Plot training and validation error
fig = go.Figure()
fig.add_trace(go.Scatter( y=cnn_lstm_history.history['loss'], mode='lines', name='Training'))
fig.add_trace(go.Scatter( y=cnn_lstm_history.history['val_loss'], mode='lines', name='Validation'))
fig.update_layout(xaxis_title="Epoch", yaxis_title="Error", title= {'text': 'Model Error', 'xanchor': 'center', 'yanchor': 'top', 'x':0.5} , bargap=0)
fig.show()

# Autoencoders

In [109]:
raw_data = df.values

In [110]:
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]


In [111]:
# Wavelet filtering
coeffs = pywt.wavedec(data, 'db4', level=1)
threshold = np.std(coeffs[-1]) * np.sqrt(2*np.log(len(data)))
coeffs[1:] = (pywt.threshold(i, value=threshold, mode='soft') for i in coeffs[1:])
data = pywt.waverec(coeffs, 'db4')


In [112]:
train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

In [None]:
labels

In [114]:
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

In [115]:
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

In [None]:
plt.grid()
plt.plot(np.arange(188), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()

In [None]:
plt.grid()
plt.plot(np.arange(188), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

In [118]:
class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(40, activation="relu"),
      layers.Dense(20, activation="relu"),
      layers.Dense(10, activation="relu")])

    self.decoder = tf.keras.Sequential([
      layers.Dense(20, activation="relu"),
      layers.Dense(40, activation="relu"),
      layers.Dense(188, activation="sigmoid")])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

In [119]:
autoencoder.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
history = autoencoder.fit(normal_train_data, normal_train_data,
          epochs=80,
          batch_size=512,
          validation_data=(test_data, test_data),
          shuffle=True)

In [None]:
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()

In [None]:
encoded_data = autoencoder.encoder(normal_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(normal_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(188), decoded_data[0], normal_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

In [None]:
encoded_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(anomalous_test_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(188), decoded_data[0], anomalous_test_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

In [None]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

In [None]:
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

In [127]:
def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

In [None]:
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)

In [None]:
def print_stats(predictions, labels):
    # Confusion Matrix
    conf_mat = confusion_matrix(labels, predictions)
    print("Confusion Matrix:")
    print(conf_mat)

    # AUC Calculation
    auc_score = roc_auc_score(labels, predictions)
    print("AUC Score:", auc_score)

    # Additional Metrics
    print("Accuracy:", accuracy_score(labels, predictions))
    print("Precision:", precision_score(labels, predictions))
    print("Recall:", recall_score(labels, predictions))
    print("F1 Score:", f1_score(labels, predictions))  # Calculate and print F1-score

# Generate predictions
preds = predict(autoencoder, test_data, threshold)

# Print statistics
print_stats(preds, test_labels)