In [8]:
import numpy as np
import matplotlib.pyplot as plt
from ecgdetectors import Detectors
# import tensorflow as tf
# from tcn import TCN
from scipy.signal import find_peaks

In [9]:
def min_max (x, x_min, x_max):
    return (x - x_min) / (x_max - x_min)

def load_ecg(file_path, t_path):
    ecg = []
    with open(file_path, 'r') as f:
        lines = f.readlines()
        for line in lines:
            ecg.append([x for x in line.split()])

    ecg_2 = np.asarray([np.float32(x[2]) for x in ecg[1:]], dtype=np.float32)
    t_marks = np.loadtxt(t_path, dtype=int)
    t_indexes = np.arange(2, len(t_marks)+2, 3)
    t_marks = t_marks[t_indexes]
    labels = np.zeros(len(ecg_2), dtype=int)
    # fill labels with 1s +- 3 samples around each T mark
    for t in t_marks:
        labels[max(0, t-3):min(len(labels), t+4)] = 1
    ecg_2 = ecg_2[:t_marks[-1] + 21]
    limit = len(ecg_2) - len(ecg_2) % 2048
    ecg_2 = ecg_2[:limit]
    labels = labels[:limit]
    ecg_2 = ecg_2.reshape(len(ecg_2)//2048, 2048)
    labels = labels.reshape(len(labels)//2048, 2048)
    return ecg_2, t_marks, labels

In [10]:
ecg_bruno, t_bruno, labels_bruno = load_ecg('/home/david/Documents/ECG_delineation/ecg-sanos/wetransfer_ecgs_2024-09-09_1340/377-Bruno/Estudio.vak', 'Serie_Bruno.txt')
ecg_mario, t_mario, labels_mario = load_ecg('/home/david/Documents/ECG_delineation/ecg-sanos/wetransfer_ecgs_2024-09-09_1340/ecg-mario/Estudio.vak', 'Serie_Mario.txt')
ecg_leo, t_leo, labels_leo = load_ecg('/home/david/Documents/ECG_delineation/ecg-sanos/wetransfer_ecgs_2024-09-09_1340/390-Leopoldo-Diagno/Estudio.vak', 'Serie_Leo.txt')
ecg_julia, t_julia, labels_julia = load_ecg('/home/david/Documents/ECG_delineation/ecg-sanos/wetransfer_ecgs_2024-09-09_1340/ecg-julia/Estudio.vak', 'Serie_Julia.txt')
ecg_seba, t_seba, labels_seba = load_ecg('/home/david/Documents/ECG_delineation/ecg-sanos/wetransfer_ecgs_2024-09-09_1340/368Seba-Diagno/Estudio.vak', 'Serie_Seba_corte_1.txt')

ecg = np.concatenate((ecg_bruno, ecg_mario, ecg_leo, ecg_julia, ecg_seba), axis=0)
labels = np.concatenate((labels_bruno, labels_mario, labels_leo, labels_julia, labels_seba), axis=0)
# shuffle ecg and labels in unison
p = np.random.permutation(len(ecg))
ecg = ecg[p]
ecg = min_max(ecg, np.min(ecg), np.max(ecg))
labels = labels[p]

ecg_train = ecg[:int(0.75*ecg.shape[0])]
labels_train = labels[:int(0.75*labels.shape[0])]
ecg_val = ecg[int(0.75*ecg.shape[0]):int(0.9*ecg.shape[0])]
labels_val = labels[int(0.75*labels.shape[0]):int(0.9*labels.shape[0])]
ecg_test = ecg[int(0.9*ecg.shape[0]):]
labels_test = labels[int(0.9*labels.shape[0]):]

# save each pair in the same npz non compressed file
np.savez('ecg_data_train.npz', ecg=ecg_train, labels=labels_train)
np.savez('ecg_data_val.npz', ecg=ecg_val, labels=labels_val)
np.savez('ecg_data_test.npz', ecg=ecg_test, labels=labels_test)

# load example
# data = np.load('ecg_data_train.npz')
# ecg_train = data['ecg']
# labels_train = data['labels']

In [None]:
# --- datos ya cargados: ecg_2 (1D float32), labels (0/1 int) ---
pos_count = np.sum(labels_train)
neg_count = labels_train.shape[0]*labels_train.shape[1] - pos_count
print("pos_count:", pos_count, "neg_count:", neg_count, "ratio neg/pos:", (neg_count/ (pos_count+1e-9)))

# 1) Construir sample_weight por timestep:
# warmup_mask = 0 para primeros RF timesteps (como ya hacías)
kernel_size = 3
dilations = [1,2,4,8,16,32]
nb_stacks = 1
RF = 1 + 2 * (kernel_size - 1) * np.sum(dilations) * nb_stacks
warmup = int(RF) - 1
warmup_mask = np.ones(warmup, dtype='float32')

# class weighting: dar mayor peso a los positivos.
# típico: pos_weight = neg_count/pos_count (clamp para evitar valores enormes)
pos_weight = float(min(50.0, (neg_count / max(1, pos_count))))
class_weights_per_timestep = np.where(labels_train==1, pos_weight, 1.0).astype('float32')

# use per-timestep class weights and apply warmup (zero out first `warmup` timesteps)
sample_weight_seq = class_weights_per_timestep.copy().astype('float32')  # shape (n_samples, seq_len)

if warmup > 0:
    if warmup < sample_weight_seq.shape[1]:
        sample_weight_seq[:, :warmup] = 0.0
    else:
        # if warmup >= seq_len, zero all weights to be safe
        sample_weight_seq[:] = 0.0

# use ecg_train / labels_train directly (add channel dim)
ecg_train = ecg_train.reshape((-1, ecg_train.shape[1], 1)).astype('float32')
labels_train = labels_train.reshape((-1, labels_train.shape[1], 1)).astype('float32')
sample_weight_seq = sample_weight_seq.reshape((-1, sample_weight_seq.shape[1], 1)).astype('float32')

print("Using ecg_train shape:", ecg_train.shape, "labels shape:", labels_train.shape, "sample_weight shape:", sample_weight_seq.shape)

# tf.data dataset
batch_size = 8
ds_train = tf.data.Dataset.from_tensor_slices((ecg_train, labels_train, sample_weight_seq))
ds_train = ds_train.shuffle(200).batch(batch_size).prefetch(tf.data.AUTOTUNE)

ecg_val = ecg_val.reshape((-1, ecg_val.shape[1], 1)).astype('float32')
labels_val = labels_val.reshape((-1, labels_val.shape[1], 1)).astype('float32')
ds_val = tf.data.Dataset.from_tensor_slices((ecg_val, labels_val))
ds_val = ds_val.batch(batch_size).prefetch(tf.data.AUTOTUNE)

# 4) modelo (usar logits o sigmoid, aquí mantengo sigmoid y sample_weight)
inp = tf.keras.Input(shape=(None, 1))
x = TCN(
    nb_filters=32,
    kernel_size=kernel_size,
    dilations=dilations,
    nb_stacks=nb_stacks,
    padding='causal',
    dropout_rate=0.1,
    return_sequences=True,
    use_skip_connections=True
)(inp)
out = tf.keras.layers.Dense(1, activation='sigmoid')(x)
model = tf.keras.Model(inp, out)

opt = tf.keras.optimizers.Adam(learning_rate=1e-4)  # LR más conservador
model.compile(optimizer=opt,
              loss='binary_crossentropy',
              metrics=[tf.keras.metrics.AUC(name='auc'),
                       tf.keras.metrics.Precision(name='precision'),
                       tf.keras.metrics.Recall(name='recall')])

# callbacks
callbacks = [
    tf.keras.callbacks.EarlyStopping(monitor='loss', patience=6, restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor='loss', factor=0.5, patience=3, min_lr=1e-7)
]

# 5) Entrenar
history = model.fit(train=ds_train, validation=ds_val, epochs=30, callbacks=callbacks, verbose=2)
