# **Caracterización de la señal ECG**

In [2]:
# Package used for loading data from the input h5 file
from h5py import File

# Package intended to work with arrays
from numpy import array

# biosignalsnotebooks python package
import biosignalsnotebooks as bsnb

# Scientific packages
from numpy import linspace, max, min, average, std, sum, sqrt, where, argmax
from numpy import array, mean, average, linspace, where
from scipy.integrate import cumtrapz
from scipy.signal import welch
import numpy as np
import matplotlib.pyplot as plt
#from bokeh.plotting import figure, show
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import Band, ColumnDataSource
from bokeh.models import BoxAnnotation

import bokeh.plotting as bp
from bokeh.io import output_notebook, show

import pywt
from bokeh.layouts import column



**Mostrar la señal de ECG**

In [4]:
# cargar la señal

relative_file_path = "C:\\Users\\Usuario\\Documents\\2024-1\\ECG_reposo.h5"
data, header = bsnb.load(relative_file_path, get_header=True)
print(data)

{'CH1': array([505, 502, 510, ..., 500, 510, 503], dtype=uint32), 'CH2': array([489, 479, 473, ..., 615, 606, 591], dtype=uint32), 'CH3': array([0, 0, 0, ..., 0, 0, 0], dtype=uint32), 'CH4': array([543, 540, 541, ..., 490, 483, 498], dtype=uint32), 'CH5': array([39, 39, 39, ..., 39, 39, 39], dtype=uint32), 'CH6': array([4, 4, 4, ..., 4, 4, 4], dtype=uint32)}


In [5]:
#mostrar la señal
signal = data["CH2"]
fs= header["sampling rate"]
time = bsnb.generate_time(signal, fs)

# Nueva Bokeh figure
bokeh_figure = figure(x_axis_label = 'Time (s)', y_axis_label='Raw Data')
bokeh_figure.line(time, signal, legend_label="Data Original")
show(bokeh_figure)
#http://notebooks.pluxbiosignals.com/notebooks/Categories/Visualise/plot_acquired_data_single_rev.html

**Convertir de V a mV**

In [6]:
device = header["device"]
resolution = int(header["resolution"][0])
#Conversión final de unidades (a mV )
vcc = 3000 # mV
gain = 1000
#signal_mv = (((array(signal) / 2**resolution) - 0.5) * vcc) / gain

# Let's convert the signal's units, since we know it is a ECG signal
signal_mv = bsnb.raw_to_phy("ECG", "biosignalsplux", signal, resolution, "mV")
time1 = bsnb.generate_time(signal_mv, fs)

bsnb.plot([time, time1], [signal, signal_mv], legend_label=["Original Data (V)", "Original Data (mV)"], y_axis_label=["Raw Data", "Raw Data"], grid_plot=True, grid_lines=2, grid_columns=1, x_axis_label="Time (s)")

**Eliminación de la linea base**

In [12]:
#Eliminación del componente continuo de nuestra señal (cambio de línea de base mediante la resta del valor promedio)
#Esta tarea asegura una mayor estabilidad de nuestro sistema de filtrado.
# Baseline shift.
signal = array(signal_mv) - mean(signal_mv)

**Espectro de la señal de ECG  original**

In [15]:
#Generación del espectro eléctrico
# Power spectrum
freq_axis_1, power_spect_1 = bsnb.plotfft(signal, fs)

bsnb.plot(freq_axis_1, power_spect_1,y_axis_label="Power Spectrum", grid_plot=True, grid_lines=1, grid_columns=1, x_axis_label="Frequency (Hz)")


**FILTRO WAVELET**

In [17]:
t=time

# Definir la función para la eliminación de ruido usando wavelets
def wavelet_denoise(signal, wavelet, level, threshold):
    coeffs = pywt.wavedec(signal, wavelet, level=level)
    for i in range(1, len(coeffs)):
        coeffs[i] = np.where(np.abs(coeffs[i]) < threshold * np.max(coeffs[i]), 0, coeffs[i])
    return pywt.waverec(coeffs, wavelet)
# Suponiendo que 'signal' y 't' ya están definidos
# signal = ...
# t = ...
# Parámetros para la función
wavelet = 'sym5' 
level = 4 

# Calcular los coeficientes wavelet de la señal
coeffs = pywt.wavedec(signal, wavelet, level=level)

# Calcular el umbral universal
N = len(coeffs)
sigma = np.median(np.abs(np.concatenate(coeffs[1:]))) / 0.6745
threshold = sigma * np.sqrt(2 * np.log(N)/N)
print(np.concatenate(coeffs[1:]))
# Filtrar la señal
filtered_signal = wavelet_denoise(signal, wavelet, level, threshold)

# Configuración para mostrar las gráficas en el notebook
output_notebook()

# Crear la gráfica de la señal original
p1 = figure(title="Señal ECG en reposo", x_axis_label='Tiempo (s)', y_axis_label='Amplitud', plot_width=800, plot_height=400)
p1.line(t, signal, legend_label='Reposo', line_width=2)
p1.legend.location = "top_left"

# Crear la gráfica de la señal filtrada
p2 = figure(title="Señal ECG Filtrada", x_axis_label='Tiempo (s)', y_axis_label='Amplitud', plot_width=800, plot_height=400)
p2.line(t[:len(filtered_signal)], filtered_signal, legend_label='Señal Filtrada 1', line_width=2, color='orange')
p2.legend.location = "top_left"

# Mostrar las gráficas en una disposición de columna
show(column(p1, p2))


[-1.1655927   0.42732891 -0.48585591 ...  1.17462092 -1.09159005
  0.37070241]


**Espectro de la señal de ECG  filtrada**

In [19]:
#Generación del espectro eléctrico
# Power spectrum
freq_axis_1, power_spect_1 = bsnb.plotfft(filtered_signal, fs)

bsnb.plot(freq_axis_1, power_spect_1,y_axis_label="Power Spectrum", grid_plot=True, grid_lines=1, grid_columns=1, x_axis_label="Frequency (Hz)")



**Peak Energy Envelope (PEE)**

In [21]:

def peak_energy_envelope(filtered_signal, window_length=43):
    # Step 1: First-order Differentiation
    DSn = np.diff(filtered_signal, n=1)
    
    # Step 2: Amplitude Normalization
    DSn_normalized = DSn / np.max(np.abs(DSn))
    
    # Step 3: Squaring Operation
    PEn = DSn_normalized ** 2
    
    # Step 4: Moving Average Filter
    # Create the moving average filter kernel
    filter_kernel = np.ones(window_length) / window_length
    
    # Apply the moving average filter
    PSn = np.convolve(PEn, filter_kernel, mode='same')
    
    return PSn

    
# Calculate Peak Energy Envelope
PEE_signal = peak_energy_envelope(filtered_signal)
    
# Print or plot the resulting signal
print(PEE_signal)

output_notebook()

 # Plot the original ECG signal
p1 = figure(title="Original ECG Signal", x_axis_label='Sample', y_axis_label='Amplitude', width=800, height=400)
p1.line(np.arange(len(filtered_signal)), filtered_signal, line_width=2, color="navy", legend_label="Original")

# Plot the Peak Energy Envelope (PEE) signal
p2 = figure(title="Peak Energy Envelope (PEE) Signal", x_axis_label='Sample', y_axis_label='Amplitude', width=800, height=400)
p2.line(np.arange(len(PEE_signal)), PEE_signal, line_width=2, color="green", legend_label="PEE")

# Show the plots
show(p1)
show(p2)

[0.00143197 0.00143497 0.00143713 ... 0.00565751 0.00557823 0.00532138]


In [23]:
import biosignalsnotebooks as bsnb
from bokeh.plotting import figure, show, output_notebook
import numpy as np

# Detectar los picos R
detected_peaks = bsnb.detect_r_peaks(filtered_signal, fs, time_units=True, plot_result=True)

# Acceder a la información necesaria para graficar
signal = detected_peaks["filtered_signal"]
peaks = detected_peaks["R-peaks"]
time = detected_peaks["time"]

# Crear una figura de Bokeh
p = figure(title="Detected R Peaks", x_axis_label='Time (s)', y_axis_label='Amplitude')

# Graficar la señal filtrada
p.line(time, signal, line_width=2, color='blue', legend_label='Filtered Signal')

# Marcar los picos R
p.circle(time[peaks], signal[peaks], size=5, color='red', legend_label='R Peaks')

# Mostrar la gráfica
output_notebook()
show(p)

TypeError: tuple indices must be integers or slices, not str

In [None]:
import scipy.signal
import numpy as np
import scipy.signal
from bokeh.plotting import figure, show, output_notebook

def find_peaks(signal):
    peaks, _ = scipy.signal.find_peaks(signal)
    return peaks

def refine_peaks(ecg_signal, estimated_peaks, window_size):
    refined_peaks = []
    for peak in estimated_peaks:
        window_start = max(0, peak - window_size)
        window_end = min(len(ecg_signal), peak + window_size + 1)
        if window_start >= window_end:
            continue
        refined_peak = window_start + np.argmax(ecg_signal[window_start:window_end])
        refined_peaks.append(refined_peak)
    return np.array(refined_peaks)

filtered_signal = np.array(filtered_signal)
filtered_signal = filtered_signal[~np.isnan(filtered_signal)]  # Remove NaN values


PEE_signal = peak_energy_envelope(filtered_signal)
# Paso 3: Encontrar los picos estimados en la señal PEE
estimated_peaks = find_peaks(PEE_signal)

# Verificar si se encontraron picos
if estimated_peaks.size == 0:
    print("No peaks found in the PEE signal.")
    refined_peaks = np.array([])  # Si no se encontraron picos, definir refined_peaks como vacío
else:
    # Paso 4: Refinar las ubicaciones de los picos buscando la amplitud máxima dentro de ±25 muestras
    refined_peaks = refine_peaks(filtered_signal, estimated_peaks,window_size=25)

# Imprimir las ubicaciones de los picos estimados y refinados
print("Estimated Peaks:", estimated_peaks)
print("Refined Peaks:", refined_peaks)

# Usar Bokeh para trazar las señales y los picos detectados
output_notebook()

# Plot the original ECG signal
p1 = figure(title="Original ECG Signal with Detected R Peaks", x_axis_label='Sample', y_axis_label='Amplitude', width=800, height=400)
p1.line(np.arange(len(filtered_signal)), filtered_signal, line_width=2, color="navy", legend_label="Original")
if refined_peaks.size > 0:  # Verificar si refined_peaks no está vacío antes de graficar
    p1.circle(refined_peaks, filtered_signal[refined_peaks], size=8, color="red", legend_label="R Peaks")

# Plot the Peak Energy Envelope (PEE) signal
p2 = figure(title="Peak Energy Envelope (PEE) Signal with Estimated Peaks", x_axis_label='Sample', y_axis_label='Amplitude', width=800, height=400)
p2.line(np.arange(len(PEE_signal)), PEE_signal, line_width=2, color="green", legend_label="PEE")
if estimated_peaks.size > 0:  # Verificar si estimated_peaks no está vacío antes de graficar
    p2.circle(estimated_peaks, PEE_signal[estimated_peaks], size=8, color="orange", legend_label="Estimated Peaks")

# Mostrar los gráficos
show(p1)
show(p2)


In [26]:
from bokeh.models import Span, Label
# Generar el tacograma utilizando bsnb
tachogram_data, tachogram_time = bsnb.tachogram(filtered_signal, fs, signal=True, out_seconds=True)
#Removal of ectopic beats
tachogram_data_NN, tachogram_time_NN = bsnb.remove_ectopy(tachogram_data, tachogram_time)
bpm_data = (1 / array(tachogram_data_NN)) * 60
#
# Maximum, Minimum and Average RR Interval
max_rr = max(tachogram_data_NN)
min_rr = min(tachogram_data_NN)
avg_rr = average(tachogram_data_NN)

# Maximum, Minimum and Average Heart Rate
max_hr = 1 / min_rr # Cycles per second
max_bpm = max_hr * 60 # BPM

min_hr = 1 / max_rr # Cycles per second
min_bpm = min_hr * 60 # BPM

avg_hr = 1 / avg_rr # Cyles per second
avg_bpm = avg_hr * 60 # BPM

# SDNN
sdnn = std(tachogram_data_NN)

time_param_dict = {"Maximum RR": max_rr, "Minimum RR": min_rr, "Average RR": avg_rr, "Maximum BPM": max_bpm, "Minimum BPM": min_bpm, "Average BPM": avg_bpm, "SDNN": sdnn}


In [28]:
# Creación de la gráfica del Tacograma
p_rr = figure(title="Tacograma con Parámetros Calculados", x_axis_label='Tiempo (s)', y_axis_label='Intervalos RR (s)', plot_width=800, plot_height=400)

# Línea del tacograma original
p_rr.line(tachogram_time_NN, tachogram_data_NN, legend_label="Tacograma Original", line_width=2, line_color="blue")

# Líneas horizontales para los valores de RR calculados
max_rr_line = Span(location=time_param_dict["Maximum RR"], dimension='width', line_color='red', line_dash='dashed', line_width=2)
min_rr_line = Span(location=time_param_dict["Minimum RR"], dimension='width', line_color='purple', line_dash='dashed', line_width=2)
avg_rr_line = Span(location=time_param_dict["Average RR"], dimension='width', line_color='orange', line_dash='dashed', line_width=2)

p_rr.add_layout(max_rr_line)
p_rr.add_layout(min_rr_line)
p_rr.add_layout(avg_rr_line)

# Marcadores para SDNN
sdnn_marker = time_param_dict["Average RR"] - time_param_dict["SDNN"]
p_rr.circle(tachogram_time_NN[np.argmin(np.abs(tachogram_data_NN - sdnn_marker))], sdnn_marker, size=10, color='brown', legend_label="SDNN")

# Etiquetas para las líneas horizontales de RR
p_rr.add_layout(Label(x=0, y=time_param_dict["Maximum RR"], text='Máximo RR', text_color='red', y_offset=5))
p_rr.add_layout(Label(x=0, y=time_param_dict["Minimum RR"], text='Mínimo RR', text_color='purple', y_offset=5))
p_rr.add_layout(Label(x=0, y=time_param_dict["Average RR"], text='Promedio RR', text_color='orange', y_offset=5))

show(p_rr)

In [30]:
# Creación de la gráfica del Heart Rate (BPM)
p_bpm = figure(title="Heart Rate (BPM) con Parámetros Calculados", x_axis_label='Tiempo (s)', y_axis_label='BPM', plot_width=800, plot_height=400)

# Línea del Heart Rate
p_bpm.line(tachogram_time_NN, bpm_data, legend_label="Heart Rate (BPM)", line_width=2, line_color="blue")

# Líneas horizontales para los valores de BPM calculados
max_bpm_line = Span(location=time_param_dict["Maximum BPM"], dimension='width', line_color='green', line_dash='dashed', line_width=2)
min_bpm_line = Span(location=time_param_dict["Minimum BPM"], dimension='width', line_color='pink', line_dash='dashed', line_width=2)
avg_bpm_line = Span(location=time_param_dict["Average BPM"], dimension='width', line_color='brown', line_dash='dashed', line_width=2)

p_bpm.add_layout(max_bpm_line)
p_bpm.add_layout(min_bpm_line)
p_bpm.add_layout(avg_bpm_line)

# Etiquetas para las líneas horizontales de BPM
p_bpm.add_layout(Label(x=0, y=time_param_dict["Maximum BPM"], text='Máximo BPM', text_color='green', y_offset=5))
p_bpm.add_layout(Label(x=0, y=time_param_dict["Minimum BPM"], text='Mínimo BPM', text_color='pink', y_offset=5))
p_bpm.add_layout(Label(x=0, y=time_param_dict["Average BPM"], text='Promedio BPM', text_color='brown', y_offset=5))

# Mostrar las gráficas

show(p_bpm)