* Imports and definitions

In [289]:
import plotly.graph_objects as go
from scipy import *
import numpy as np
import pywt
import pandas as pd
from sklearn.utils import shuffle
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay


samplingRate = 360  # Hz (given in the doc)

In [290]:
def draw(amplitudes, indices=None):
    if type(indices) == "list":  # if we have certain indices or not
        fig_cont = go.Figure()
        fig_cont.add_trace(
            go.Scatter(x=indices, y=amplitudes, mode="lines", name="Continuous Signal")
        )
    else:
        fig_cont = go.Figure()
        fig_cont.add_trace(
            go.Scatter(y=amplitudes, mode="lines", name="Continuous Signal")
        )
    fig_cont.update_layout(
        title="Continuous Signal",
        xaxis_title="Index",
        yaxis_title="Amplitude",
        width=1000,
        height=500,
    )

    fig_cont.show()  # For notebooks

In [291]:
# HB means HeartBeat
def readHB(fileName):
    signals = []
    with open(fileName) as l:
        line = l.readline()
        while line:
            signal = line.split("|")
            signals.append([float(value) for value in signal[:-1]])
            line = l.readline()
    return signals

In [292]:
dirPath = "C:/Users/omart/Desktop/level 2/python/ECG-Classification/Data"

In [293]:
healthySignals = readHB(f"{dirPath}/Normal_Train.txt")

In [294]:
PVCSignals = readHB(f"{dirPath}/PVC_Train.txt")

## After some investigating , I noticed the following:
* healthySignals and PVCSignals each contain 200 signals
* Each signal has 300 points
* Max amplitude is 1.6692
* Min amplitude is -2.1939

In [295]:
draw(healthySignals[104])  # drawing a random signal to see what we are working with

In [296]:
draw(PVCSignals[180])

### There's a huge difference between the healthy and the PVC , hopefully the classification won't be hard

### Applying the Band Pass filter with cutoffs = [0.5,40]

In [297]:
b, a = signal.butter(
    N=4, Wn=[0.5, 40], btype="bandpass", fs=samplingRate
)  # getting the filter coofecients

In [298]:
filteredHealthy = [signal.lfilter(b, a, s) for s in healthySignals]
filteredPVC = [signal.lfilter(b, a, s) for s in PVCSignals]
filteredHealtyTest = [signal.lfilter(b, a, s) for s in readHB(f"{dirPath}/Normal_Test.txt")]
filteredPVCTest = [signal.lfilter(b, a, s) for s in readHB(f"{dirPath}/PVC_Test.txt")]

### The effect of filtering is very clear. Removed all unneeded noise

In [299]:
draw(filteredHealthy[104])

### Now will apply DWT and take the details coefficients because they contain the difference between normal and PVC signals

In [300]:
level = 2
healthyDetails = [pywt.wavedec(signal, "db4", level = level)[0] for signal in filteredHealthy]
PVCDetails = [pywt.wavedec(signal, "db4", level = level)[0] for signal in filteredPVC]
healthyTestDetails = [pywt.wavedec(signal, "db4", level = level)[0] for signal in filteredHealtyTest]
PVCTestDetails = [pywt.wavedec(signal, "db4", level = level)[0] for signal in filteredPVCTest]
#draw(pywt.wavedec(filteredHealthy[0], "db4", level = level)[0])

In [301]:
draw(healthyDetails[0])

In [302]:
draw(PVCDetails[0])

In [303]:
draw(healthyTestDetails[0])

### We can see that the difference between them is noticeable 

### We will use min-max scaling so the signal would range from [0,1]

In [304]:
def normalize(signal):
    return (signal - np.min(signal)) / (np.max(signal) - np.min(signal))

In [305]:
normalizedHealthy = [normalize(signal) for signal in healthyDetails]
normalizedPVC = [normalize(signal) for signal in PVCDetails]
normalizedHealthyTest = [normalize(signal) for signal in healthyTestDetails]
normalizedPVCTest = [normalize(signal) for signal in PVCTestDetails]

### A small comparison between processed signal vs raw signal

In [306]:
draw(PVCSignals[0])

In [307]:
draw(normalizedPVC[0])

* PVC : 1
* Healthy : 0

In [308]:
### Creating the DF and shuffling so its ready for training

In [309]:
data = normalizedHealthy + normalizedPVC
labels = list(np.zeros(200)) + list(np.ones(200))

df = pd.DataFrame({"x": data, "y": labels})
df = shuffle(df).reset_index(drop=True)

In [310]:
df

Unnamed: 0,x,y
0,"[0.22958304123918333, 0.2259102537110092, 0.23...",0.0
1,"[0.17908302574376475, 0.2078341665526546, 0.15...",0.0
2,"[0.5119925187487299, 0.5457954856016585, 0.488...",1.0
3,"[0.16911641046305448, 0.2707793782714757, 0.09...",1.0
4,"[0.22391676347262224, 0.2202542989147631, 0.22...",0.0
...,...,...
395,"[0.5255772321777898, 0.5778606513483424, 0.489...",1.0
396,"[0.1831807166588089, 0.1974372096408312, 0.173...",0.0
397,"[0.10883185594091244, 0.15561426092102643, 0.0...",0.0
398,"[0.5135985851485878, 0.544875238454359, 0.4919...",1.0


In [311]:
### Creating the TestDF and shuffling so its ready for testing

In [312]:
testData = normalizedHealthyTest + normalizedPVCTest
testLabels = list(np.zeros(200)) + list(np.ones(200))

testDF = pd.DataFrame({"x": testData, "y": testLabels})
testDF = shuffle(df).reset_index(drop=True)

### Start training

In [313]:
knn = KNeighborsClassifier(n_neighbors=5)
knn.fit(list(df["x"]),list(df["y"]))

### Predict data and Measuring accuracy

In [314]:
yPredict = knn.predict(list(testDF['x']))
yPredict

array([1., 0., 1., 0., 1., 1., 0., 1., 1., 0., 0., 0., 1., 0., 0., 0., 0.,
       1., 0., 1., 1., 0., 1., 0., 0., 1., 0., 0., 0., 1., 1., 0., 0., 0.,
       1., 0., 0., 1., 0., 1., 1., 1., 0., 1., 1., 0., 1., 0., 1., 0., 1.,
       0., 0., 0., 1., 0., 0., 0., 0., 1., 0., 0., 1., 0., 1., 1., 0., 0.,
       0., 0., 1., 1., 1., 1., 1., 0., 1., 1., 1., 0., 1., 0., 1., 1., 0.,
       1., 0., 1., 1., 0., 1., 0., 0., 1., 0., 1., 0., 1., 1., 1., 1., 1.,
       1., 1., 1., 0., 1., 0., 1., 1., 0., 0., 0., 0., 0., 1., 1., 0., 0.,
       0., 0., 1., 1., 1., 1., 1., 0., 0., 1., 0., 0., 1., 1., 1., 0., 1.,
       0., 0., 1., 0., 1., 0., 1., 1., 0., 1., 0., 1., 1., 0., 1., 0., 1.,
       0., 1., 0., 0., 1., 0., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 0.,
       1., 1., 0., 1., 1., 0., 0., 1., 0., 0., 0., 0., 0., 1., 1., 0., 0.,
       1., 1., 1., 1., 0., 0., 1., 1., 1., 0., 1., 1., 1., 1., 0., 1., 0.,
       1., 0., 1., 1., 1., 0., 0., 1., 1., 1., 1., 0., 0., 0., 0., 0., 1.,
       1., 0., 1., 0., 0.

In [315]:
yTest = list(testDF['y'])
accuracy_score(yTest, yPredict)

1.0

In [316]:
print(classification_report(yTest, yPredict))

              precision    recall  f1-score   support

         0.0       1.00      1.00      1.00       200
         1.0       1.00      1.00      1.00       200

    accuracy                           1.00       400
   macro avg       1.00      1.00      1.00       400
weighted avg       1.00      1.00      1.00       400



In [317]:
def predict_signal_from_file(file_path, knn_model):

        input_signal = readHB(file_path)
        filtered_signal = signal.lfilter(b, a, input_signal)
        print("before normalization")
        draw(filtered_signal[0])
        detail_coeffs = pywt.wavedec(filtered_signal, "db4",level=level)[0]
        normalized_signal = normalize(detail_coeffs)
        print("after normalization")
        draw(normalized_signal[0])
        prediction = knn_model.predict(normalized_signal)[0]

        return "Normal Heartbeat" if prediction == 0 else "PVC"
    


In [318]:
test_file_path = f"{dirPath}/PVCECGSig_150.txt"
result = predict_signal_from_file(test_file_path, knn)
print(f"The signal in this file is classified as: {result}")

before normalization


after normalization


The signal in this file is classified as: PVC
