# ECG Beat Detector Comparison
## Important Variables
Important variables used throughout the code can be changed here.
- `segmentation_window_size`: every signal in a database is sliced into pieces of `segmentation_window_size` seconds to make the signals more comparable
- `tolerance_window_size`: to determine wether a QRS complex was determined correctly a tolerance window of ±`tolerance_window_size` milliseconds is used

In [83]:
segmentation_window_size = 10 # in seconds, default is 10
tolerance_window_size = 150 # in milliseconds, default is 150

## Import Libraries & Start Engines
Install all important libraries with pip apart from `matlab.engine` which has to be installed from the MATLAB root folder. You can find more information on instlaling the `matlab.engine` [here](https://de.mathworks.com/help/matlab/matlab_external/install-the-matlab-engine-for-python.html)

In [84]:
import matlab.engine
import numpy as np
from cProfile import label
import matplotlib.pyplot as plt
%matplotlib widget
import pathlib
from ecgdetectors import Detectors
import sys
import wfdb
from wfdb import processing
import pandas
import os
from detectors.visgraphdetector import VisGraphDetector

from os import listdir
from os.path import isfile, join
current_working_directory = os.getcwd()

Start the `matlab.engine` to be able to execute MATLAB commands or functions from within Python.

In [168]:
eng = matlab.engine.start_matlab()

## Generate Synthetic Signal
Some initial parameters for the generation of the synthetic signals. Here it is possible to choose diffent types of artificial noise and also choose whether the signal should be real or synthetic.

In [86]:
# ---Initial parameters---
rrLength = 50       # A desired ECG signal length (the number of RR intervals) 
APBrate = 0.10      # Rate of atrial premature beats (APB). A number between 0 and 0.5
onlyRR = 0          # 1 - only RR intervals are generated, 0 - multilead ECG is generated

medEpis = 15        # Median duration of an atrial fibrillation (AF) episode
stayInAF = float(1-np.log(2)/medEpis)   # Probability to stay in AF state
AFburden = 0.8      # AF burden. 0 - the entire signal is sinus rhythm (SR), 1 - the entire signal is AF

noiseType = 4       # Type of noise. A number from 0 to 4. 0 - no noise added (noise RMS = 0 mV), 
                    # 1 - motion artefacts, 2 - electrode movement artefacts, 3 - baseline wander, 
                    # 4 - mixture of type 1, type 2 and type 3 noises
noiseRMS = 0.02     # Noise level in milivolts 

realRRon = 1       # 1 - real RR series are used, 0 - synthetic
realVAon = 1       # 1 - real ventricular activity is used, 0 - synthetic
realAAon = 1       # 1 - real atrial activity is used, 0 - synthetic
# Note: cannot select real atrial activity and synthetic ventricular activity

In the next field the signal is generated.

In [87]:
signal_generator_path = eng.genpath('C:/Users/flori\OneDrive\Dokumente\TU\Bachelor Thesis\Code\Signal_generator')
eng.addpath(signal_generator_path, nargout=0)

returndata = eng.simPAF_ECG_generator(rrLength, realRRon, realVAon, realAAon, AFburden, stayInAF, APBrate, noiseType, noiseRMS, onlyRR)

## Define Classes and Functions
Function to determine the peaks in a signal.

In [120]:
def predict_peaks(segment, detector):

    if isinstance(segment, RecordingSegment):
        signal = segment.Signal
        fs = segment.Fs

    elif isinstance(segment, Recording):
        signal = segment.WholeSignal
        fs = segment.Fs

    else:
        raise Exception('You have to input either a RecordingSegment or a Recording as the segment.')
        
    
    if detector.short_name == "match_fil" and (fs != 250 and fs != 360):
        print(detector.short_name, "could not run because the sample rate is wrong and was skipped")
        return []

    try:
        predicted_peaks = detector.predicted_qrs_compelx(signal=signal, fs=fs)
    except IndexError:
        print(detector.short_name, "failed due to an index error and was skipped")
        return []
    
    return predicted_peaks

In [89]:
def binary_classification(segment, predicted_peaks):

    if isinstance(segment, RecordingSegment):
        signal = segment.Signal
        fs = segment.Fs
        actual_peaks = segment.Actual_Qrs_Complex

    elif isinstance(segment, Recording):
        signal = segment.WholeSignal
        fs = segment.Fs
        actual_peaks = segment.WholeActual_Qrs_Complex

    else:
        raise Exception('You have to input either a RecordingSegment or a Recording as the segment.')

    pp = len(predicted_peaks)
    tp = 0
    fp = 0
    fn = 0
    
    actual_peaks_iter = actual_peaks

    for predicted_peak in predicted_peaks:
        tpdetect = 1
        for i in range(len(actual_peaks_iter)):
            if predicted_peak >= (actual_peaks_iter[i] - tolerance_window_size) and predicted_peak <= (actual_peaks_iter[i] + tolerance_window_size):
                tp+=1
                tpdetect = 0
                actual_peaks_iter = np.delete(actual_peaks_iter, i)
                break
        if tpdetect:
            fp+=1

    for actualpeak in actual_peaks_iter:
        fn+=1

    return pp, tp, fp, fn

In [142]:
def score_calculation(tp, fp, fn):
    try:
        sensitivity = tp / (tp+fn)
    except ZeroDivisionError:
        sensitivity = 0
    try:
        positive_predictivity = tp / (tp+fp)
    except ZeroDivisionError:
        positive_predictivity = 0
    try:    
        f1_score = tp / (tp+.5*(fp+fn))
    except ZeroDivisionError:
        f1_score = 0
    return sensitivity, positive_predictivity, f1_score

The necessary classes for the detectors and the databases are created.

In [91]:
class Detector():
    def __init__(self, name, short_name, algorithm) -> None:
        self.name = name
        self.algorithm = algorithm
        self.short_name = short_name
    
    def predicted_qrs_compelx(self, signal, fs):
        return self.algorithm(signal, fs)

    def name(self):
        return self.name

    def short_name(self):
        return self.short_name

class Database():
    def __init__(self, Name, Users, Fs) -> None:
        self.Name = Name
        self.Users = Users
        self.Fs = Fs

    def Name(self):
        return self.Name

    def Users(self):
        return self.Users

    def Fs(self):
        return self.Fs

class Evaluation():
    def __init__(self, RecordingSegment, Detector, predict_peaks, binary_classification, score_calculation) -> None:
        self.RecordingSegment = RecordingSegment
        self.Detector = Detector
        self.predict_peaks = predict_peaks
        self.binary_classification = binary_classification
        self.score_calculation = score_calculation
        self.predicted_peaks = None
        self.pp = None
        self.tp = None
        self.fp = None
        self.fn = None
        self.sensitivity = None
        self.positive_predictivity = None
        self.f1_score = None
    
    def calculate(self):
        self.predicted_peaks = self.predict_peaks(self.RecordingSegment,self.Detector)
        self.pp, self.tp, self.fp, self.fn = self.binary_classification(self.RecordingSegment, self.predicted_peaks)
        self.sensitivity, self.positive_predictivity, self.f1_score = self.score_calculation(self.tp, self.fp, self.fn)
        return self

class RecordingSegment():
    def __init__(self, Signal, Actual_Qrs_Complex, Fs) -> None:
        self.Signal = Signal
        self.Actual_Qrs_Complex = Actual_Qrs_Complex
        self.Fs = Fs
        self.Evaluations = []

    def Signal(self):
        return self.Signal

    def Actual_Qrs_Complex(self):
        return self.Actual_Qrs_Complex

    def Fs(self):
        return self.Fs

    def Evaluation(self, detectors, predict_peaks, binary_classification, score_calculation):
        evaluations = []
        for detector in detectors:
            evaluations.append(Evaluation(self,detector,predict_peaks, binary_classification, score_calculation).calculate())
        return evaluations

class Recording():
    def __init__(self, RecordingName, RecordingSegments, WholeSignal, WholeActual_Qrs_Complex, Fs) -> None:
        self.RecordingName = RecordingName
        self.RecordingSegments = RecordingSegments
        self.WholeSignal = WholeSignal
        self.WholeActual_Qrs_Complex = WholeActual_Qrs_Complex
        self.Fs = Fs

    def RecordingName(self):
        return self.RecordingName
    
    def RecordingSegments(self):
        return self.RecordingSegments

    def WholeSignal(self):
        return self.WholeSignal
    
    def WholeActual_Qrs_Complex(self):
        return self.WholeActual_Qrs_Complex

    def Fs(self):
        return self.Fs

class User():
    def __init__(self, UserName,Recordings) -> None:
        self.UserName = UserName
        self.Recordings = Recordings

    def UserName(self):
        return self.UserName

    def Recordings(self):
        return self.Recordings

Function to split signals in smaller parts including the respecitve qrs complexes. `split_signal` returns an array of arrays.

In [167]:
def split_signal(signal, fs, actual_qrs_complexes):
    signal = np.array(signal)
    actual_qrs_complexes = np.array(actual_qrs_complexes)
    split = [[] for i in range(((len(signal)//(fs*segmentation_window_size)*2)-1))]

    for split_signal_counter in range((len(signal)//(fs*segmentation_window_size)*2)-1):
        min_index = split_signal_counter*(fs*segmentation_window_size//2)
        max_index = split_signal_counter*(fs*segmentation_window_size//2)+fs*segmentation_window_size-1

        split[split_signal_counter].append(signal[min_index:max_index])

        min_counter = 0
        max_counter = 0

        for qrs_complex_counter in range(len(actual_qrs_complexes)):
            if actual_qrs_complexes[qrs_complex_counter] < min_index:
                min_counter += 1
            if actual_qrs_complexes[qrs_complex_counter] < max_index:
                max_counter += 1
        split[split_signal_counter].append(actual_qrs_complexes[min_counter:max_counter]-min_index)

    return split


Create arrays to safe the detectors and databases.

In [203]:
detectors = []
databases = []

## Add Databases
Create a `Database` object for `telehealth_environment_database` including all sub objects necessary to initialize it.

In [170]:
telehealth_path = join(current_working_directory,'databases/telehealth')
telehealth_files = [f for f in listdir(telehealth_path) if isfile(join(telehealth_path, f))]

telehealth_fs = 500
users = []
recordings = []
for file in telehealth_files:
    recordingsegments = []
    data = pandas.read_csv(join(telehealth_path, file),sep=",",header=None)
    signal = np.array(data[0]).astype(float)
    qrs_complex_indices = np.array(data[1]).astype(int)
    actual_qrs_complexes = []
    
    for indexcounter in range(len(qrs_complex_indices)):
        if qrs_complex_indices[indexcounter]:
            actual_qrs_complexes.append(indexcounter)
    
    splits = split_signal(signal=signal,fs=telehealth_fs, actual_qrs_complexes=np.array(actual_qrs_complexes).astype(int))
    
    for split in splits:
        recordingsegments.append(RecordingSegment(Signal=split[0], Actual_Qrs_Complex=split[1], Fs=telehealth_fs))
    recordings.append(Recording(RecordingName=str(file),RecordingSegments = recordingsegments,WholeSignal=signal,WholeActual_Qrs_Complex=actual_qrs_complexes,Fs=telehealth_fs))
users.append(User(UserName="default",Recordings = recordings))   
        
telehealth_environment_database = Database(
    Name="Telehealth Test Database",
    Users=users,
    Fs=telehealth_fs)

Create a `Database` object for `synth_database` including all sub objects necessary to initialize it.

In [171]:
synth_fs = 500
signal = np.transpose(np.array(returndata['multileadECG']))[:,0]
actual_qrs_complexes = np.transpose(np.array(returndata['QRSindex']))[:,0].astype(int)

splits = split_signal(signal=signal,fs=synth_fs,actual_qrs_complexes=actual_qrs_complexes)

recordingsegments = []
for split in splits:
    recordingsegments.append(RecordingSegment(Signal=split[0],Actual_Qrs_Complex=split[1],Fs=synth_fs))
recordings = [Recording("default",RecordingSegments=recordingsegments, WholeSignal=signal,WholeActual_Qrs_Complex=actual_qrs_complexes,Fs=synth_fs)]
users = [User(UserName="default",Recordings=recordings)]

synth_database = Database(
    Name="Synthetic data", 
    Users=users,
    Fs=synth_fs)

Create a `Database` object for `wfdb_test_database` including all sub objects necessary to initialize it.

In [172]:
wfdb_fs = wfdb.rdrecord('sample-data/100', sampfrom=0, sampto=10000, channels=[0]).fs
signal = np.array(wfdb.rdrecord('sample-data/100', sampfrom=0, sampto=10000, channels=[0]).p_signal[:,0])
actual_qrs_complexes = np.array(wfdb.rdann('sample-data/100','atr', sampfrom=0, sampto=10000).sample[1:]).astype(int)

splits = split_signal(signal=signal, fs=wfdb_fs, actual_qrs_complexes=actual_qrs_complexes)

recordingsegments = []
for split in splits:
    recordingsegments.append(RecordingSegment(Signal=split[0],Actual_Qrs_Complex=split[1],Fs=wfdb_fs))
recordings = [Recording("default",RecordingSegments=recordingsegments, WholeSignal=signal,WholeActual_Qrs_Complex=actual_qrs_complexes,Fs=wfdb_fs)]
users = [User(UserName="default",Recordings=recordings)]

wfdb_test_database = Database(
    Name="WFDB Test Database",
    Users=users,
    Fs=wfdb_fs)

Add data from different databases to `databases`.

In [186]:
databases.append(synth_database)
#databases.append(wfdb_test_database)
#databases.append(telehealth_environment_test_database)

## Add Detectors
Add detectors from different locations in the standaradized format to the detectors list.

Add gqrs detector to the detectors list in the standard format.

In [187]:
detectors.append(Detector(name="GQRS", short_name="gqrs", algorithm=processing.qrs.gqrs_detect))

Create function and add jqrs detector to the detectors list in the standard format.

In [188]:
jqrs_algo_path = eng.genpath('C:/Users/flori\OneDrive\Dokumente\TU\Bachelor Thesis\Code\Playground\detectors\jqrs')
eng.addpath(jqrs_algo_path, nargout=0)

def run_jqrs_detector(signal, fs):
    threshold = 0.6 # energy threshold of the detector in au, default = 0.6
    ref_period = 0.250 # refractory period in sec between two R-peaks in ms, default = 0.250
    newsignal = [[i] for i in signal]
    return np.array(eng.qrs_detect2(matlab.double(newsignal), threshold, ref_period, matlab.double(fs)))[0].astype(int)

detectors.append(Detector(name="JQRS", short_name="jqrs", algorithm=run_jqrs_detector))

Create function and add visgraphdetector detector to the detectors list in the standard format.

In [189]:
def run_visgraph_detector(signal, fs):
    beta = 0.55 # beta, default = 0.55
    gamma = 0.5 # gamm, default = 0.5
    lowcut = 4 # lowcut, default = 4
    R_peaks, weights, weighted_signal = VisGraphDetector(fs).visgraphdetect(signal, beta=beta, gamma=gamma, lowcut=lowcut, M = 2*fs)
    return R_peaks

detectors.append(Detector(name="VisGraphDetector", short_name="visgraph", algorithm=run_visgraph_detector))

Create function and add rpeakdetect detector to the detectors list in the standard format.

In [190]:
rpeakdetect_path = eng.genpath('C:/Users/flori\OneDrive\Dokumente\TU\Bachelor Thesis\Code\Playground\detectors')
eng.addpath(rpeakdetect_path, nargout=0)

def run_rpeakdetect_detector(signal, fs):
    threshhold = 0.2 # default = 0.2
    testmode = 0 # default = 0
    newsignal = [[i] for i in signal]
    return np.array(eng.rpeakdetect(matlab.double(newsignal), matlab.double(fs),threshhold,testmode)['R_index'])[0].astype(int)

detectors.append(Detector(name="rpeakdetect", short_name="rpeak", algorithm=run_rpeakdetect_detector))

Create function and add r_deco detector to the detectors list in the standard format.

In [191]:
r_deco_path = eng.genpath('C:/Users/flori\OneDrive\Dokumente\TU\Bachelor Thesis\Code\Playground\detectors/r_deco')
eng.addpath(r_deco_path, nargout=0)

def run_r_deco_detector(signal, fs):
    envelope_size = 300.0 # envelope size in ms, default = 300.0
    average_heart_rate = 100.0 # average heart rate in bpm, default = 100.0
    post_processing = 1.0 # post processing where 1.0 means yes, default = 1.0
    ectopic_removal = 0.0 # ectopic removal where 1.0 means yes, default = 0.0
    inverted_signal = 0.0 # inverted signal where 1.0 means yes, default = 0.0
    parameters_check = 0.0 # parameters check in UI where 1.0 means yes, default = 0.0
    newsignal = [[i] for i in signal]
    return np.array(eng.peak_detection([envelope_size,average_heart_rate,post_processing,ectopic_removal,inverted_signal],matlab.double(newsignal), matlab.double(fs),parameters_check)).astype(int)[0][0]

detectors.append(Detector(name="r_deco", short_name="rdeco", algorithm=run_r_deco_detector))

Create function and add unsw detector to the detectors list in the standard format.

In [192]:
unsw_path = eng.genpath('C:/Users/flori\OneDrive\Dokumente\TU\Bachelor Thesis\Code\Databases\Telehealth')
eng.addpath(unsw_path, nargout=0)

def run_unsw_detector(signal, fs):
    mask = [] # mask could be implemented later if wanted
    plotting = 0 # 1.0 for ploting intermediate signals, 0.0 for no plotting, default = 0.0
    newsignal = [[i] for i in signal]
    return np.array(eng.UNSW_QRSDetector(matlab.double(newsignal), matlab.double(fs),matlab.double(mask),matlab.double(plotting))).astype(int)[0]

#detectors.append(Detector(name="UNSW_QRSDetector", short_name="unsw", algorithm=run_unsw_detector))

Create functions to get the detectors from the `ecgdetectors` package into the right standard format to be able to iterate through them.

In [193]:
def run_two_average_detector(signal, fs):
    return Detectors(fs).two_average_detector(unfiltered_ecg=signal)

def run_matched_filter_detector(signal, fs):
        return Detectors(fs).matched_filter_detector(unfiltered_ecg=signal)

def run_swt_detector(signal, fs):
    return Detectors(fs).swt_detector(unfiltered_ecg=signal)

def run_engzee_detector(signal, fs):
    return Detectors(fs).engzee_detector(unfiltered_ecg=signal)

def run_christov_detector(signal, fs):
    return Detectors(fs).christov_detector(unfiltered_ecg=signal)

def run_hamilton_detector(signal, fs):
    return Detectors(fs).hamilton_detector(unfiltered_ecg=signal)

def run_pan_tompkins_detector(signal, fs):
    return Detectors(fs).pan_tompkins_detector(unfiltered_ecg=signal)

def run_wqrs_detector(signal, fs):
    return Detectors(fs).wqrs_detector(unfiltered_ecg=signal)

Add detectors from the `ecgdetectors` package to the detectors list.

In [194]:
detectors.append(Detector(name="Elgendi et al (Two average)", short_name="two_avg", algorithm=run_two_average_detector))
detectors.append(Detector(name="Matched filter", short_name="match_fil", algorithm=run_matched_filter_detector))
detectors.append(Detector(name="Kalidas & Tamil (Wavelet transform)", short_name="swt", algorithm=run_swt_detector))
detectors.append(Detector(name="Engzee", short_name="engz", algorithm=run_engzee_detector))
detectors.append(Detector(name="Christov", short_name="christ", algorithm=run_christov_detector))
detectors.append(Detector(name="Hamilton", short_name="hamilt", algorithm=run_hamilton_detector))
detectors.append(Detector(name="Pan Tompkins", short_name="pan_tomp", algorithm=run_pan_tompkins_detector))
detectors.append(Detector(name="WQRS", short_name="wqrs", algorithm=run_wqrs_detector))

## Predict QRS Complexes
All algorithms are run and the resulting QRS complexes are saved in vectors.

In [202]:
for database in databases:
    for user in database.Users:
        for recording in user.Recordings:
            sensitivity = []
            positive_predictivity = []
            f1_score = []
            for recordingsegment in recording.RecordingSegments:
                evaluations = recordingsegment.Evaluation(detectors, predict_peaks, binary_classification, score_calculation)
                sensitivity_iter = [1 for evalunation in evaluations]
                positive_predictivity_iter = [1 for evalunation in evaluations]
                f1_score_iter = [1 for evalunation in evaluations]
                for i in range(len(evaluations)):
                    if evaluations[i].sensitivity < sensitivity_iter[i]: sensitivity_iter[i] = evaluations[i].sensitivity
                    if evaluations[i].positive_predictivity < positive_predictivity_iter[i]: positive_predictivity_iter[i] = evaluations[i].positive_predictivity
                    if evaluations[i].f1_score < f1_score_iter[i]: f1_score_iter[i] = evaluations[i].f1_score

                sensitivity = sensitivity_iter
                positive_predictivity = positive_predictivity_iter
                f1_score = f1_score_iter

            data = [np.around(sensitivity,decimals=2), np.around(positive_predictivity,decimals=2),np.around(f1_score,decimals=2)]
            rows = ["sensitiv", "pos pred", "f1 score"]
            columns = [detector.short_name for detector in detectors]

            print(pandas.DataFrame(data, rows, columns))
                    

match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
match_fil could not run because the sample rate is wrong and was skipped
          gqrs  jqrs  visgraph  rpeak  rdeco  two_a