In [1]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, BatchNormalization, MaxPooling1D, Flatten, Dense, Dropout
from keras.regularizers import l1_l2
import numpy as np
import wfdb
import os
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns
from keras.regularizers import l1_l2
from scipy.signal import lfilter, firwin, find_peaks, filtfilt, butter, spectrogram
import pywt
from pyhrv.hrv import hrv
import pyhrv

In [2]:
def get_data(path):
    signals = []
    all_meta = []  # To store metadata for each signal
    i = 0
    # Walk through all directories and subdirectories
    for root, dirs, files in os.walk(path):
        for filename in files:
            if filename.endswith('.hea'):
                base_filename = filename[:-4]
                hea_filepath = os.path.join(root, base_filename)
                try:
                    # Read the .hea file using wfdb.rdsamp
                    signal, meta = wfdb.rdsamp(hea_filepath)
                    signals.append(signal)
                    all_meta.append(meta)   
                    i+=1
                except Exception as e:
                    print(f"Failed to read {hea_filepath}: {e}")
    return signals, all_meta

In [3]:
def get_diag(all_meta, csv_file='Nombres.csv'):
    diaglist = []
    df = pd.read_csv(csv_file)
    snomed_cts = df['Snomed_CT'].tolist()
    acronym_names = df['Acronym Name'].tolist()
    full_names = df["Full Name"].tolist()
    #print(acronym_names)
    #print(snomed_cts)
    #print(full_names)
    pairedList = []
    for i in range(len(all_meta)):
      diagnostics = all_meta[i]["comments"][2].split(" ")[1].split(",")
      diaglist.append(diagnostics)
    return diaglist, full_names, snomed_cts
    


In [4]:
def flatten(lista):
    lista_flat = []
    for elemento in lista:
        if isinstance(elemento, list):
            lista_flat.append(elemento[0])
            # lista_flat.extend(flatten(elemento))
        else:
            lista_flat.append(elemento)
    return lista_flat

In [5]:
def freqCount(lista, names, code):
    frecuencias = {}
    for elemento in lista:
        if elemento in frecuencias:
            frecuencias[elemento] += 1
        else:
            frecuencias[elemento] = 1
    frecuencias
    rElements = [elemento for elemento, conteo in frecuencias.items() if conteo > 1000]
    rNames = [names[code.index(int(element))] for element in rElements if int(element) in code]
    
    return rNames, rElements

In [6]:
def filter_Wavelet(ECGsignal1):
  coeffs = pywt.wavedec(ECGsignal1, 'db8', level=8)
  threshold = 0.2
  filtered_coeffs = [pywt.threshold(coeff, threshold, mode='soft') for coeff in coeffs]
  filtered_signal = pywt.waverec(filtered_coeffs, 'db8')
  return filtered_signal

In [7]:
def get_signals(rCodes, meta, signals):
    pairedList = []
    dList = []
    for i in range(len(meta)):
        diagnostics = meta[i]["comments"][2].split(" ")[1].split(",")[0]
        dList.append(diagnostics)
        
    for j in range(0, len(rCodes)):
        iCode = rCodes[j]
        for k in range(0, len(dList)):
            kdList=dList[k]
            if kdList == iCode:
                # filSignal = filter_Wavelet(signals[k])
                pairedList.append((signals[k], int(iCode)))
    return pairedList

In [8]:
def OrderedLists(totList):
    AFlist = []
    SBlist = []
    SRlist = []
    STlist = []
    sAFlist = []
    sSBlist = []
    sSRlist = []
    sSTlist = []
    for i in totList:
        diag = i[1]
        sig = np.transpose(i[0])[1]
        if diag == 164889003:
            # Fibrilación AUricular
            sAFlist.append(sig)
            AFlist.append(0)
        elif diag == 426177001:
            # Bradicardia Sinusal
            sSBlist.append(sig)
            SBlist.append(1)
        elif diag == 426783006:
            # Ritmo Sinusal Normal
            sSRlist.append(sig)
            SRlist.append(2)
        elif diag == 427084000:
            # Taquicardia Sinusal
            sSTlist.append(sig)
            STlist.append(3)
    conj = (AFlist, SBlist, SRlist, STlist) 
    conjS = (sAFlist, sSBlist, sSRlist, sSTlist)
    return conj, conjS

In [10]:
path =  "C:/Users/Eva/Downloads/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0"
signals, meta = get_data(path)

Failed to read C:/Users/Eva/Downloads/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0\WFDBRecords\01\019\JS01052: time data '/' does not match format '%d/%m/%Y'
Failed to read C:/Users/Eva/Downloads/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0/a-large-scale-12-lead-electrocardiogram-database-for-arrhythmia-study-1.0.0\WFDBRecords\23\236\JS23074: list index out of range


In [11]:
diag, names, code = get_diag(meta)

In [12]:
flatDiag = flatten(diag)

In [13]:
rNames, rCodes = freqCount(flatDiag, names, code)

In [14]:
totList = get_signals(rCodes, meta, signals)

In [15]:
conj, conjS = OrderedLists(totList) 

In [27]:
Xtest_list = []
Xtrain_list = []
Ytrain_list = []
Ytest_list = []
Xval_list = []
Yval_list = []
for i,j in zip(conj, conjS):
    i = i[0:1000]
    j = j[0:1000]
    Xtrain, Xtest, Ytrain, Ytest = train_test_split(j, i, test_size=0.2)
    Xtrain, Xval, Ytrain, Yval = train_test_split(Xtrain, Ytrain, test_size=0.25)
    
    Xtrain_list.extend(Xtrain)
    Xval_list.extend(Xval)
    Xtest_list.extend(Xtest)
    Ytrain_list.extend(Ytrain)
    Ytest_list.extend(Ytest)
    Yval_list.extend(Ytest)
    
