<link rel="stylesheet" href="../../styles/theme_style.css">
<!--link rel="stylesheet" href="../../styles/header_style.css"-->
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css">

<table width="100%">
    <tr>
        <td id="image_td" width="15%" class="header_image_color_5"><div id="image_img" class="header_image_5"></div></td>
        <!-- Available classes for "image_td" element:
        - header_image_color_1 (For Notebooks of "Open" Area);
        - header_image_color_2 (For Notebooks of "Acquire" Area);
        - header_image_color_3 (For Notebooks of "Visualise" Area);
        - header_image_color_4 (For Notebooks of "Process" Area);
        - header_image_color_5 (For Notebooks of "Detect" Area);
        - header_image_color_6 (For Notebooks of "Extract" Area);
        - header_image_color_7 (For Notebooks of "Decide" Area);
        - header_image_color_8 (For Notebooks of "Explain" Area);

        Available classes for "image_img" element:
        - header_image_1 (For Notebooks of "Open" Area);
        - header_image_2 (For Notebooks of "Acquire" Area);
        - header_image_3 (For Notebooks of "Visualise" Area);
        - header_image_4 (For Notebooks of "Process" Area);
        - header_image_5 (For Notebooks of "Detect" Area);
        - header_image_6 (For Notebooks of "Extract" Area);
        - header_image_7 (For Notebooks of "Decide" Area);
        - header_image_8 (For Notebooks of "Explain" Area);-->
        <td class="header_text"> Event Detection - R Peaks (ECG) </td>
    </tr>
</table>

<div id="flex-container">
    <div id="diff_level" class="flex-item">
        **Difficulty Level:**   <span class="fa fa-star checked"></span>
                                <span class="fa fa-star checked"></span>
                                <span class="fa fa-star checked"></span>
                                <span class="fa fa-star"></span>
                                <span class="fa fa-star"></span>
    </div>
    <div id="tag" class="flex-item-tag">
        <span id="tag_list">
            <table id="tag_list_table">
                <tr>
                    <td class="shield_left">Tags</td>
                    <td class="shield_right" id="tags">detect|ecg</td> 
                </tr>
            </table>
        </span>
        <!-- [OR] Visit https://img.shields.io in order to create a tag badge-->
    </div>
</div>

One of the distinctive characteristics of the ECG signal corresponds to its periodicity, something that is not common in physiological terms.

Due to this characteristic, the study of cardiac cycle variability (heart rate variability) defines an important segment of ECG analysis.

However, heart rate variability analysis is dependent of the detection of ECG R peaks, which is the main topic of the present **<span class="color5">Jupyter Notebook</span>**. This task can be achieved by applying the Pan-Tompkins algorithm, translated to **<span class="color1">Python</span>** paradigm by Raja Selvaraj <a href="https://github.com/RajaS/ecgtk"><img src="../../images/icons/link.png" width="10px" height="10px" style="display:inline"></a>

<hr>

<p class="steps">1 - Importation of the needed packages and definition of auxiliary functions</p>

In [1]:
# OpenSignals Tools own package for loading and plotting the acquired data
import opensignalstools.open as ostOpen
import opensignalstools.visualise as ostVis
import opensignalstools.detect as ostDetect

# Scientific packages
import numpy
import scipy.signal as scipySig

# Base packages used in OpenSignals Tools Notebooks for ploting data
from bokeh.plotting import figure, output_file, show
from bokeh.io import output_notebook
from bokeh.layouts import gridplot
from bokeh.models.tools import *
output_notebook(hide_banner=True)

In [2]:
# Computation of threshold based on signal and noise values
def buffer_update(signal_peak, noise_peak):
    threshold = noise_peak + 0.25 * (signal_peak - noise_peak)
    return threshold

# Computes all possible peaks and their amplitudes
def detect_possible_pks(integrated_signal):
    possible_peaks = [i for i in range (0,len(integrated_signal)-1) 
                      if integrated_signal[i-1] < integrated_signal[i] and integrated_signal[i] > integrated_signal[i+1]]
    possible_amplitudes = [integrated_signal[k] for k in possible_peaks]
    
    return numpy.array(possible_peaks), possible_amplitudes

# Computes all probable peaks
def detect_probable_pks(possible_peaks, possible_amplitudes, fs):
    # Minimum RR interval = 200 ms
    min_RR = (fs / 1000) * 200
    
    probable_peaks = []

    # Starts with first peak
    if len(possible_peaks) == 0:
        raise Exception("No Peaks Detected.")
    peak_candidate_i = possible_peaks[0]
    peak_candidate_amp = possible_amplitudes[0]
    for peak_i, peak_amp in zip(possible_peaks, possible_amplitudes):
        if peak_i - peak_candidate_i <= min_RR and peak_amp > peak_candidate_amp:
            peak_candidate_i = peak_i
            peak_candidate_amp = peak_amp
        elif peak_i - peak_candidate_i > min_RR:
            probable_peaks += [peak_candidate_i - 6] # Delay of 6 samples
            peak_candidate_i = peak_i
            peak_candidate_amp = peak_amp
        else:
            pass

    return numpy.array(probable_peaks)

# Computes all definitive peaks
def detect_definitive_pks(probable_peaks, integrated_signal, threshold, signal_peak_1, noise_peak_1, rr_buffer, fs):
        noise_peak_out = noise_peak_1
        signal_peak_out = signal_peak_1
        peaks_amp = [integrated_signal[peak] for peak in probable_peaks]
        definitive_peaks = []
        for i, p in enumerate(probable_peaks):
            amp = peaks_amp[i]
            
            # Accept if larger than threshold and slope in raw signal is +-30% of previous slopes
            if amp > threshold:
                definitive_peaks, signal_peak_out, rr_buffer = acceptpeak(p, amp, definitive_peaks, signal_peak_1, rr_buffer)
                
            # Accept as qrs if higher than half threshold, but is 360 ms after last qrs and next peak is more than 1.5 rr 
            # intervals away just abandon it if there is no peak before or after
            elif amp > threshold / 2 and len(definitive_peaks) > 0 and len(probable_peaks) > i + 1:
                mean_rr = numpy.mean(rr_buffer)
                last_qrs_ms = (p - definitive_peaks[-1]) * (1000 / fs)
                last_qrs_to_next_peak = probable_peaks[i + 1] - definitive_peaks[-1]
                if last_qrs_ms > 360 and last_qrs_to_next_peak > 1.5 * mean_rr:
                    definitive_peaks, signal_peak_out, rr_buffer = acceptpeak(p, amp, definitive_peaks, signal_peak_1, rr_buffer)
                else:
                    noise_peak_out = noisepeak(amp, noise_peak_1)
            
            # If not either of these it is noise
            else:
                noise_peak_out = noisepeak(amp, noise_peak_1)
            
            # Update of the threshold
            threshold = buffer_update(signal_peak_1, noise_peak_out)
        
        definitive_peaks = numpy.array(definitive_peaks)
        return definitive_peaks

# Accepting Criteriums
def acceptpeak(peak, amp, definitive_peaks_in, signal_peak_in, rr_buffer_in):
    definitive_peaks_out = definitive_peaks_in
    definitive_peaks_out = numpy.append(definitive_peaks_out, peak)

    rr_buffer_out = rr_buffer_in
    
    # Update of signal peak value
    signal_peak_out = 0.125 * amp + 0.875 * signal_peak_in # signal_peak_1 is the running estimate of the signal peak
    if len(definitive_peaks_out) > 1:
        rr_buffer_out.pop(0)
        rr_buffer_out = rr_buffer_out + [definitive_peaks_out[-1] - definitive_peaks_out[-2]]
        
    return numpy.array(definitive_peaks_out), signal_peak_out, rr_buffer_out

# Noise peak value update
def noisepeak(amp, noise_peak_in):
    noise_peak_out = 0.125 * amp + 0.875 * noise_peak_in # noise_peak_1 is the running estimate of the noise peak
    
    return noise_peak_out

<p class="steps">2 - Load of acquired ECG data</p>

In [3]:
# Load of data
data, header = ostOpen.loadData("../Open/signals/ecg_4000_Hz.h5", getHeader=True)

<p class="steps">3 - Identification of mac address of the device and the channel used during acquisition</p>

In [4]:
mac_address = list(data.keys())[0]
channel = list(data[mac_address].keys())[0]

print ("Mac Address: " + str(mac_address) + " Channel: " + str(channel))

Mac Address: 00:07:80:D8:A7:F9 Channel: CH1


<p class="steps">4 - Storage of sampling frequency and acquired data inside variables</p>

In [5]:
# Sampling frequency and acquired data
fs = header[mac_address]["sampling rate"]

# Signal Samples
signal = data[mac_address][channel]
time = numpy.linspace(0, len(signal) / fs, len(signal))

<p class="steps">5 - Simplification of ECG signal (Isolation of abrupt transitions)</p>

<p class="steps">5.1 - Step 1 of Pan-Tompkins Algorithm - ECG Filtering (Bandpass between 5 and 15 Hz)</p>

In [6]:
# Step 1 of Pan-Tompkins Algorithm
nyquist_fs = fs / 2
low_cutoff = 5
high_cutoff = 15

normalized_cutoffs = [low_cutoff / nyquist_fs, high_cutoff / nyquist_fs]
transfer_function_num, transfer_function_den = scipySig.butter(2, normalized_cutoffs, btype='bandpass')
filtered_signal = scipySig.filtfilt(transfer_function_num, transfer_function_den, signal, padlen=150)

In [7]:
# ======================================= Representation of the output of Step 1 ==================================================
# List that store the figure handler
list_figures_1 = [[]]

# Plotting of Original Signal    
list_figures_1[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Original ECG", **ostVis.opensignalsKwargs("figure")))
list_figures_1[-1][-1].line(time[:fs], signal[:fs], **ostVis.opensignalsKwargs("line"))

# Plotting of Filtered Signal    
list_figures_1[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Filtering Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_1[-1][-1].line(time[:fs], filtered_signal[:fs], **ostVis.opensignalsKwargs("line"))

In [28]:
grid_plot_1 = gridplot(list_figures_1, **ostVis.opensignalsKwargs("gridplot"))
show(grid_plot_1)

<p class="steps">5.2 - Step 2 of Pan-Tompkins Algorithm - ECG Differentiation</p>

In [9]:
# Step 2 of Pan-Tompkins Algorithm
differentiated_signal = numpy.diff(filtered_signal)

In [10]:
# ======================================= Representation of the output of Step 2 ==================================================
# List that store the figure handler
list_figures_2 = [[]]

# Plotting of Filtered Signal    
list_figures_2[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Filtering Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_2[-1][-1].line(time[:fs], filtered_signal[:fs], **ostVis.opensignalsKwargs("line"))

# Plotting of Differentiated Signal    
list_figures_2[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Differentiation Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_2[-1][-1].line(time[1:fs], differentiated_signal[:fs - 1], **ostVis.opensignalsKwargs("line"))

In [29]:
grid_plot_2 = gridplot(list_figures_2, **ostVis.opensignalsKwargs("gridplot"))
show(grid_plot_2)

<p class="steps">5.3 - Step 3 of Pan-Tompkins Algorithm - ECG Rectification</p>

In [12]:
# Step 3 of Pan-Tompkins Algorithm
squared_signal = differentiated_signal * differentiated_signal

In [13]:
# ======================================= Representation of the output of Step 3 ==================================================
# List that store the figure handler
list_figures_3 = [[]]

# Plotting of Differentiated Signal    
list_figures_3[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Differentiation Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_3[-1][-1].line(time[1:fs], differentiated_signal[:fs - 1], **ostVis.opensignalsKwargs("line"))

# Plotting of Rectified Signal    
list_figures_3[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Rectification Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_3[-1][-1].line(time[1:fs], squared_signal[:fs - 1], **ostVis.opensignalsKwargs("line"))

In [30]:
grid_plot_3 = gridplot(list_figures_3, **ostVis.opensignalsKwargs("gridplot"))
show(grid_plot_3)

<p class="steps">5.4 - Step 4 of Pan-Tompkins Algorithm - ECG Integration</p>

In [15]:
# Step 4 of Pan-Tompkins Algorithm (Moving window integration)
nbr_sampls_int_wind = int(0.080 * fs)
integrated_signal = numpy.zeros_like(squared_signal)
cumulative_sum = squared_signal.cumsum()
integrated_signal[nbr_sampls_int_wind:] = (cumulative_sum[nbr_sampls_int_wind:] - cumulative_sum[:-nbr_sampls_int_wind]) / nbr_sampls_int_wind
integrated_signal[:nbr_sampls_int_wind] = cumulative_sum[:nbr_sampls_int_wind] / numpy.arange(1, nbr_sampls_int_wind + 1)

In [16]:
# ======================================= Representation of the output of Step 4 ==================================================
# List that store the figure handler
list_figures_4 = [[]]

# Plotting of Rectified Signal    
list_figures_4[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Rectification Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_4[-1][-1].line(time[1:fs], squared_signal[:fs - 1], **ostVis.opensignalsKwargs("line"))

# Plotting of Integrated Signal    
list_figures_4[-1].append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', title="Post Integration Signal", **ostVis.opensignalsKwargs("figure")))
list_figures_4[-1][-1].line(time[1:fs], integrated_signal[:fs - 1], **ostVis.opensignalsKwargs("line"))

In [31]:
grid_plot_4 = gridplot(list_figures_4, **ostVis.opensignalsKwargs("gridplot"))
show(grid_plot_4)

<p class="steps">6 - Application of the ECG simplified version to a sequence of peak identification steps</p>
This task is achieved by using a threshold system, with this threshold being dynamically adjusted along the acquisition, like originally proposed by Pan and Tompkins <a href="http://www.robots.ox.ac.uk/~gari/teaching/cdt/A3/readings/ECG/Pan+Tompkins.pdf"><img src="../../images/icons/link.png" width="10px" height="10px" style="display:inline"></a>

<p class="steps">6.1 - Initialisation of the R peak detection algorithm</p>

In [18]:
rr_buffer = [1] * 8
signal_peak_1 = max(integrated_signal[fs:2 * fs]) #  Running estimate of the signal peak
noise_peak_1 = 0 #  Running estimate of the noise peak
threshold = buffer_update(signal_peak_1, noise_peak_1)

<p class="steps">6.2 - Detection of possible R peaks</p>

In [19]:
possible_peaks, possible_amplitudes = detect_possible_pks(integrated_signal)

<p class="steps">6.3 - Detection of probable R peaks</p>

In [20]:
probable_peaks = detect_probable_pks(possible_peaks, possible_amplitudes, fs)

<p class="steps">6.4 - Identification of definitive R peaks</p>

In [21]:
definitive_peaks = detect_definitive_pks(probable_peaks, integrated_signal, threshold, signal_peak_1, noise_peak_1, rr_buffer, fs)

# Conversion to integer type.
definitive_peaks = numpy.array(list(map(int, definitive_peaks)))

<p class="steps">6.5 - Correcting step</p>

In [22]:
map_integers = definitive_peaks - 40 * (fs / 1000)
definitive_peaks_reph = numpy.array(list(map(int, map_integers)))

<p class="steps">7 - Evolution of the detected peaks (from possible to definitive R peaks)</p>

In [23]:
# ======================================= Representation of the detected peaks =====================================================
# List that store the figure handler
list_figures_5 = []

# Plotting of a signal segment with 2 seconds
segment_data = numpy.array(signal[:2 * fs])
segment_int = numpy.array(integrated_signal[:2 * fs])
segment_time = numpy.array(time[:2 * fs])

# Peaks list for the 2 seconds window
possible_peaks_wind = possible_peaks[possible_peaks < len(segment_int)]
probable_peaks_wind = probable_peaks[probable_peaks < len(segment_int)]
definitive_peaks_wind = definitive_peaks[definitive_peaks < len(segment_int)]

list_figures_5.append(figure(x_axis_label='Time (s)', y_axis_label='Raw Data', **ostVis.opensignalsKwargs("figure")))
list_figures_5[-1].line(segment_time, segment_int, **ostVis.opensignalsKwargs("line"))
list_figures_5[-1].circle(segment_time[definitive_peaks_wind], segment_int[definitive_peaks_wind], size=30, color="#00893E", legend="Definitive Peaks")
list_figures_5[-1].circle(segment_time[probable_peaks_wind], segment_int[probable_peaks_wind], size=20, color="#009EE3", legend="Probable Peaks")
list_figures_5[-1].circle(segment_time[possible_peaks_wind], segment_int[possible_peaks_wind], size=10, color="#302683", legend="Possible Peaks")

In [32]:
show(list_figures_5[-1])

*This procedure can be automatically done by **detectRPeaksData** function in **detect** modeule of **<span class="color2">opensignalstools</span>** package*

In [25]:
ostDetect.detectRPeaks(signal, fs, timeUnits=True, plotResult=True)

(array([ 0.08200198,  0.85577067,  1.62753931,  2.3848076 ,  3.11507524,
         3.80909201,  4.49360854,  5.18112515,  5.88589217,  6.59365927,
         7.327927  ,  8.0979456 ,  8.89721491,  9.66498345]),
 array([38032, 37666, 37598, 37914, 38078, 38000, 37582, 37665, 37722,
        38000, 38016, 37808, 37804, 37884], dtype=uint16))

<span class="color6">**Auxiliary Code Segment (should not be replicated by the user)**</span>

In [26]:
from IPython.display import Javascript
ostVis.opensignalsStyle([item for sublist in list_figures_1 for item in sublist])
ostVis.opensignalsStyle([item for sublist in list_figures_2 for item in sublist])
ostVis.opensignalsStyle([item for sublist in list_figures_3 for item in sublist])
ostVis.opensignalsStyle([item for sublist in list_figures_4 for item in sublist])
ostVis.opensignalsStyle(list_figures_5)
Javascript("Jupyter.notebook.execute_cells([17, 21, 25, 29, 43])")

<IPython.core.display.Javascript object>

In [27]:
from opensignalstools.__notebook_support__ import cssStyleApply
cssStyleApply()

.................... CSS Style Applied to Jupyter Notebook .........................
