In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import wfdb
import csv
import os
from scipy.signal import butter, filtfilt
import scipy
import time
from scipy.interpolate import CubicSpline
from sklearn.preprocessing import MinMaxScaler

In [2]:
def perf(labels, peaks, minmax):
    x = np.concatenate([np.array(labels), np.array(peaks)]) #list(set(QRS + r_peaks))
    x.sort()
    diff = x[1:]-x[:-1]
    gps = np.concatenate([[0], np.cumsum(diff>=minmax)])
    temp = [x[gps==i] for i in range(gps[-1]+1)]
    TP = 0
    FP = 0
    FN = 0
    list_F = []
    for sublist in temp:
        if len(sublist) == 2:
            TP += 1
        else:
            list_F.append(sublist)
            if sublist[0] in labels:
                FN += 1
            else:
                FP += 1
    return TP, FP, FN #, list_F

def print_signal(signal, description = "a"):
    print(f"signal de longueur: {len(signal)}")
    plt.figure()
    plt.plot(signal)
    #plt.scatter(QRS, [signal[i] for i in QRS], color='red')
    plt.title(label= description)
    plt.show()
    
def print_signal_qrs(signal, qrs, description = "a"):
    print(f"signal de longueur: {len(signal)}")
    plt.figure()
    plt.plot(signal)
    plt.scatter(qrs, [signal[i] for i in qrs], color='red')
    plt.title(label= description)
    plt.show()

In [3]:
import numpy as np
from scipy.signal import cheby1, filtfilt

# Fonction pour filtrer le signal avec un filtre passe-bande de Chebyshev
def bandpass_filter(signal, fs, lowcut, highcut, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = cheby1(order, 0.5, [low, high], btype='band')
    return filtfilt(b, a, signal)

# Fonction pour la différenciation fractionnaire d'ordre numérique
def dfod(signal, order=0.5):
    return np.diff(signal, n=int(order))

# Fonction pour la transformation de longueur de courbe absolue (A-CLT)
def aclt(signal):
    return np.cumsum(np.abs(np.diff(signal)))

# Fonction pour la moyenne mobile
def moving_average(signal, window_size=30):
    return np.convolve(signal, np.ones(window_size) / window_size, mode='same')

# Fonction pour détecter les pics R
def detect_r_peaks(ecg_signal, fs=360):
    # Filtrage passe-bande
    filtered_signal = bandpass_filter(ecg_signal, fs, 0.5, 40)
    
    # Différenciation fractionnaire
    differentiated_signal = dfod(filtered_signal)
    
    # Transformation A-CLT
    aclt_signal = aclt(differentiated_signal)
    
    # Moyenne mobile
    moving_avg_signal = moving_average(aclt_signal)
    
    # Détection des pics R
    r_peaks = []
    threshold = np.mean(moving_avg_signal)
    
    for i in range(1, len(aclt_signal) - 1):
        if aclt_signal[i] > threshold and aclt_signal[i] > aclt_signal[i - 1] and aclt_signal[i] > aclt_signal[i + 1]:
            r_peaks.append(i)
    
    return r_peaks


df = pd.read_csv('data_csv/mit_bih_Arrhythmia/101.csv') #207
ecg_signal = np.array(df["MLII"], dtype=np.float32) #[:10000] #[:maxi]
fs = 360
QRS = [int(i) for i in df["labels"].dropna().tolist()[1:]]
r_peaks = detect_r_peaks(ecg_signal)
perf(QRS,r_peaks,36)

(1, 0, 1871)

In [4]:
r_peaks

[]