# ECG classification

Laurent Cetinsoy - Datadidacte


In [1]:
!pip install PyWavelets



In [2]:
from IPython.display import HTML

## A first naive model by extracting simple features


Your environment contains variables arr, nsr, and chf which respectively contain 10-second recordings of ECG signals extracted from three datasets on PhysioNet: one from a person suffering from arrhythmia, one from a person with a normal heart rhythm, and another from a person with heart failure.


Matplotlib  subplots (or any other library), display these signals on three subfigures (the subplots should be called with the parameter nrows = 3).
Can you find any differences between them?

In [3]:
import numpy as np
import scipy as sp
import pandas as pd

arr = np.loadtxt('arr.txt')
chf = np.loadtxt('chf.txt')
nsr = np.loadtxt('nsr.txt')


We want to extract features from the time series. For that we will use simple statistics.


Create a function named calculate_stats_features(x) that calculates some statistical features of a signal x using standard numpy functions: nanpercentile, nanmean, etc.
calculate_stats_features will return a list of features in this order:

0. Max
1. Min
2. Mean
3. Median
4. Variance

In [4]:
def calculate_stats_features(x):
    return np.array([
        np.nanmax(x, axis=-1),
        np.nanmin(x, axis=-1),
        np.nanmean(x, axis=-1),
        np.nanmedian(x, axis=-1),
        np.nanvar(x, axis=-1)
    ]).T

calculate_stats_features(arr)

array([ 1.375     , -0.59      , -0.31201111, -0.335     ,  0.03966355])



Create a function named `calculate_zero_crossing(x)` that calculates the Zero
Crossing of a signal x.

The zero crossing is defined as the number of times the signal changes sign.
For this, you can use the signbit, diff, and nonzero functions from numpy.


In [5]:
def calculate_zero_crossing(x):
    return np.abs(np.diff(1 * np.signbit(x))).sum(axis=-1)
val = calculate_zero_crossing(arr)
print(val)

22


Create a function named **calculate_rms(x)** that returns the Root Mean Square (RMS) of a signal x. We will use the nanmean function instead of the mean function from numpy.

In [6]:
def calculate_rms(x):
    return np.sqrt(np.mean(np.power(x, 2), axis=-1))

print(calculate_rms(arr))

0.37015467862923346


Create a function named calculate_entropy(x) that calculates the Shannon entropy of a signal x using the entropy function from scipy.stats.

In [7]:
def calculate_entropy(x):
    offset = np.abs(np.min(x, axis=-1))[..., None]
    return sp.stats.entropy(x + offset, axis=-1)

Create a function get_features(x) that combines the features calculated by all previous functions including caculate_stats_features.

In [8]:
def get_features(x):
    features = calculate_stats_features(x)
    zero_crossing = calculate_zero_crossing(x)
    rms = calculate_rms(x)
    entropy = calculate_entropy(x)
    return np.hstack((
        features,
        zero_crossing[...,None],
        rms[...,None],
        entropy[...,None]
    ))

get_features(arr)

array([ 1.375     , -0.59      , -0.31201111, -0.335     ,  0.03966355,
       22.        ,  0.37015468,  8.02023546])

Load the small ecg dataset
Use your fonction get_features create a new dataframe where you have all the feature as X and y as the label.
Train a random forest on it after doing a train test split if the dataset is not too small

In [9]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier

def load_dataset(filepath: str):
    df = pd.read_csv(filepath, header=None)
    df = df.drop([0], axis=1)

    y = df.iloc[1:, 0].to_numpy()
    X = df.iloc[1:, 1:].to_numpy()
    return X, y

def build_train_model(X, y):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
    clf = RandomForestClassifier(max_depth=2, random_state=0)
    clf.fit(X_train, y_train)
    print(clf.score(X_test, y_test))
    return clf

X, y = load_dataset("./ecg_small_dataset.csv")
X_featured = get_features(X)


build_train_model(X_featured, y)

0.5


Now you have a first pipeline, do the same on the full dataset
Report the train and test loss

In [10]:
X, y = load_dataset("./ECG-laurent.csv")
X_featured = get_features(X)

clf = build_train_model(X_featured, y)

  df = pd.read_csv(filepath, header=None)


0.7575757575757576


try to tweak the model hyperparameter to see if it works

## Fourier transform features

We want now to see if a model using only fourier transform could work.

create a function get_fourier_coefficients(ecg)

In [11]:
def get_fourier_coefficients(ecg):
    return np.abs(sp.fft.fft(ecg))

get_fourier_coefficients(X).shape

(162, 65536)

Using this function create a dataframe df_fourier containing the fourrier transform coefficients and the label

In [12]:
def dataframe_df_fourier(fft):
    columns = [ "coeff_"+ str(i + 1) for i in range(fft.shape[1])]
    return pd.DataFrame(
        data=fft,
        columns=columns
    )

def dataframe_df_features(features):
    columns = ["max", "min", "mean", "median", "variance", "zero crossing", "rms", "entropy"]
    return pd.DataFrame(
        data=features,
        columns=columns
    )



Try to train a model using the Fourrier coefficient

In [13]:
X, y = load_dataset("./ECG-laurent.csv")
X_fourier = dataframe_df_fourier(get_fourier_coefficients(X)).to_numpy()

clf = build_train_model(X_fourier, y)
clf

  df = pd.read_csv(filepath, header=None)


0.6666666666666666


Try to learn a model using both fourrier coefficient and the features from the previous sections. Does it work ?

In [14]:
# X, y = load_dataset("./ECG-laurent.csv")
df_fourier = dataframe_df_fourier(get_fourier_coefficients(X))
df_features = dataframe_df_features(get_features(X))

df_processed = pd.concat([df_features, df_fourier], axis=1)
X_processed = df_processed.to_numpy()

clf = build_train_model(X_processed, y)
clf

0.696969696969697


## Wavelets

We now wants to use another signal decomposition which are called wavelet. Wavelet are a multi-scale function decomposition on a familly of functions generated from what is called a mother wavelet.

Using PyWavelet make a function get_wavelet_coefficients(ecg) that returns the wavelet coefficient of a given ECG


In [15]:
import pywt

def get_wavelet_coefficients(ecg):
    return pywt.dwt(ecg, "db1")

Using the get_wavelet_coefficients, create a dataframe where the features are the coefficients and include the label

In [16]:
def dataframe_df_wavelet(waves):
    columns = [ "coeff_"+ str(i + 1) for i in range(waves[0].shape[1] + waves[1].shape[1])]
    return pd.DataFrame(
        data=np.hstack(waves),
        columns=columns
    )

Train a random forest classifier with such features. DOes the model work

In [17]:
dataframe_df_wavelet(get_wavelet_coefficients(X))

Unnamed: 0,coeff_1,coeff_2,coeff_3,coeff_4,coeff_5,coeff_6,coeff_7,coeff_8,coeff_9,coeff_10,...,coeff_65527,coeff_65528,coeff_65529,coeff_65530,coeff_65531,coeff_65532,coeff_65533,coeff_65534,coeff_65535,coeff_65536
0,-0.041328,-0.023841,-0.191199,-0.341946,-0.552479,-0.552182,-0.557753,-0.267132,-0.122545,-0.207548,...,0.065699,0.016719,0.053518,0.021233,0.014724,-0.040096,-0.045579,-0.042910,0.038852,0.117287
1,-0.790911,-0.865018,-0.766844,-0.698250,-0.643461,-0.445871,-0.383873,-0.578744,-0.799402,-0.840409,...,-0.010720,-0.015836,-0.018014,-0.048044,-0.036967,-0.004891,0.054351,0.036088,-0.000337,-0.029607
2,-0.272506,-0.295406,-0.265384,-0.228808,-0.240806,-0.268648,-0.359485,0.239197,0.775566,0.999685,...,-0.023410,0.025392,0.068484,0.084473,0.207043,-0.089120,-0.153329,-0.082890,-0.055987,-0.014258
3,0.139769,0.139784,0.109061,0.095343,0.098507,0.027239,-0.254103,-0.276091,0.393507,1.676674,...,-0.017118,0.011268,0.044394,0.175267,-0.045659,-0.127169,-0.135989,0.065010,0.080593,0.014107
4,-0.636040,-0.661424,-0.402270,-0.315595,-0.255518,-0.172525,-0.113160,-0.014918,0.132438,0.232168,...,0.013890,0.004778,-0.050799,-0.437964,0.558049,-0.084620,0.043570,-0.033640,-0.107628,-0.159106
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
157,0.127818,-0.311521,-1.459113,-1.831337,-1.392302,-0.802171,-0.408950,-0.214244,-0.005426,0.141853,...,-0.079195,-0.046403,-0.056491,-0.031241,-0.029014,-0.020844,0.005015,-0.011970,0.002185,0.006545
158,-0.512205,-0.570402,-0.569971,-0.521425,-0.480995,-0.447012,-0.444562,-0.439274,-0.447098,-0.463224,...,-0.024171,0.017110,0.000482,-0.025721,-0.000338,-0.018288,-0.005972,-0.007668,-0.016222,-0.019366
159,-0.131898,-0.115320,-0.134718,-0.196943,-0.212998,-0.219617,-0.225862,-0.227732,-0.234253,-0.221944,...,-0.020154,-0.010923,-0.031767,-0.007734,0.033134,-0.004324,0.016074,0.000579,0.005376,-0.000354
160,0.421908,0.377787,-0.137546,-0.607688,-0.925552,-0.945349,-1.016995,-0.864036,-0.748605,-0.618940,...,-0.057088,-0.065876,-0.015059,0.023097,0.015839,0.014198,0.008636,0.030311,0.054237,0.060466


Add one or several of the previous feature functions and try to train another model

In [18]:
df_wavelets = dataframe_df_wavelet(get_wavelet_coefficients(X))
df_features = dataframe_df_features(get_features(X))

df_processed = pd.concat([df_features, df_wavelets], axis=1)
X_processed = df_processed.to_numpy()

clf = build_train_model(X_processed, y)
clf

0.8484848484848485


Specify the methodology you used to train the model and report the various attempts results into a table

## Deep learning (1D CNN)

Now we want to see if we can skip all theses feature engineering techniques !
Design and train a multi-layer one dimensional CNN using the raw ECG signal as features.


Could you reach or surpass the feature based models ?

In [19]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense, Dropout

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

X_train = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

model = Sequential()
model.add(Conv1D(filters=32, kernel_size=3, activation='relu', input_shape=(X_train.shape[1], 1)))
model.add(MaxPooling1D(pool_size=2))

model.add(Conv1D(filters=64, kernel_size=3, activation='relu'))
model.add(MaxPooling1D(pool_size=2))

model.add(Conv1D(filters=128, kernel_size=3, activation='relu'))
model.add(MaxPooling1D(pool_size=2))

model.add(Flatten())

model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))

model.add(Dense(1, activation='sigmoid'))

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

history = model.fit(X_train, y_train, epochs=20, batch_size=32, validation_split=0.2)

test_loss, test_acc = model.evaluate(X_test, y_test)

print(f'Test Accuracy: {test_acc:.4f}')

2024-10-16 14:56:36.797062: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  super().__init__(


Epoch 1/20
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.6186 - loss: -270.5398 - val_accuracy: 0.5385 - val_loss: 13.0215
Epoch 5/20
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.5905 - loss: -613.8932 - val_accuracy: 0.5385 - val_loss: -28.1925
Epoch 6/20
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 2s/step - accuracy: 0.6488 - loss: -946.1343 - val_accuracy: 0.5385 - val_loss: -203.6011
Epoch 7/20
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 2s/step - accuracy: 0.5926 - loss: -2754.9617 - val_accuracy: 0.5385 - val_loss: -307.8959
Epoch 8/20
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 2s/step - accuracy: 0.6249 - loss: -4278.2412 - val_accuracy: 0.5385 - val_loss: -524.3863
Epoch 9/20
[1m4/4[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m9s[0m 2s/step - accuracy: 0.5936 - loss: -9020.4453 - val_accuracy: 0.5385 - val_loss: -1011.4009
Epoch 10/20
[1m4/4[