# Detectar arritmias cardíacas mediante señales de ECG parcialmente etiquetadas
### INF395 Introducción a las Redes Neuronales and Deep Learning
- Estudiante: Alessandro Bruno Cintolesi Rodríguez
- ROL: 202173541-0

## **1. Librerias**

In [None]:
# === General / Utilidad ===
from datetime import datetime
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import List, Optional, Dict

# === PyTorch, PyTorch Lightning, Torchvision ===
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim.lr_scheduler import LambdaLR, CosineAnnealingLR
import pytorch_lightning as pl

# === Scikit-learn ===
from sklearn.cluster import KMeans
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
	f1_score, accuracy_score, recall_score, confusion_matrix,
	roc_auc_score, classification_report
)

## **2. Variables globales**

In [None]:
# Clases
CLASSES = {
	0: "(N) Latido normal",
	1: "(S) Latido supraventricular",
	2: "(V) Latido ventricular ectópico",
	3: "(F) Latido de fusión",
	4: "(Q) Latido desconocido"
}

# Hiperparámetros revisados
SEED = 42
SIGNALS = 187
EMBEDDING_DIM = 256
PROJ_HID = 256
PROJ_OUT = 128

NUM_WORKERS = 0
BATCH_SSL = 512
BATCH_LP = 256
BATCH_FT = 256

EPOCHS_SSL = 100
EPOCHS_LP = 25
EPOCHS_FT = 40

LR_SSL = 1e-3
LR_LP = 2e-3
LR_FT_HEAD = 1e-3
LR_FT_ENC = 1e-4

WD = 1e-4
TEMP = 0.15

ArrayLike = np.ndarray

## **3. Setup del Dispositivo**

In [None]:
# Seteamos la semilla
torch.manual_seed(SEED)
np.random.seed(SEED)
pl.seed_everything(SEED, workers=True)

# Seteamos el dispositivo
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
torch.backends.cudnn.benchmark = True
print("Using device:", DEVICE)
if DEVICE.type == "cuda":
	print("GPU:", torch.cuda.get_device_name(0))

## **4. Funciones Auxiliares**

In [None]:
def plot_ecg(X, y, aug=False, aug_title=""):
	y_class = CLASSES.get(y, "Sin clasificar")
	title = f"ECG Clase: {y_class}"
	if aug:
		title = title + f" | {aug_title}"

	plt.plot(X)
	plt.title(title)
	plt.xlabel("Tiempo (muestras)")
	plt.ylabel("Amplitud")
	plt.show()

In [None]:
def plot_clusters(X, clusters, n_clusters):
	for i in range(n_clusters):
		cluster_mask = clusters == i
		cluster_mean = X[cluster_mask].mean(axis=0)
		plt.plot(cluster_mean, label=f"Cluster {i}")

	plt.legend()
	plt.title("Promedio de señal por cluster")
	plt.xlabel("Muestra")
	plt.ylabel("Amplitud promedio")
	plt.grid(True)
	plt.show()

## **5. Analisis Exploratorio de Datos**

In [None]:
# Leemos nuestros datos desde los CSV
train_df = pd.read_csv("ecg_signals/train_semi_supervised.csv")
test_df = pd.read_csv("ecg_signals/test_semi_supervised.csv")

In [None]:
# Dividimos nuestros datos en X (serie de tiempo) / y (label)
X_train = train_df.iloc[:, :-1].values
y_train = train_df.iloc[:, -1].values

X_test = test_df.iloc[:, 1:-1].values
y_test = test_df.iloc[:, -1].values

In [None]:
# Separamos nuestros datos de entrenamiento por etiquetados (labeled) y no etiquetados (unlabeled)
y_train_np = np.asarray(y_train)
mask_unl = np.isnan(y_train_np.astype(float, copy=False)) if y_train_np.dtype.kind in "fc" else np.zeros_like(y_train_np, dtype=bool)
mask_unl |= (y_train_np == -1)

mask_lab = ~mask_unl

X_lab_train = X_train[mask_lab]
y_lab_train = y_train_np[mask_lab].astype(int)
X_unl_train = X_train[mask_unl]

num_classes = int(np.unique(y_lab_train).size)
T = X_train.shape[1] 

In [None]:
# Graficando un ECG
plot_ecg(X=X_train[1], y=y_train[1])

In [None]:
# Clusters con K-means
train_kmeans = KMeans(n_clusters=5, random_state=SEED)
train_clusters = train_kmeans.fit_predict(X_train)

In [None]:
plot_clusters(X=X_train, clusters=train_clusters, n_clusters=5)

In [None]:
# Hacemos un conteo por clases
unique, counts = np.unique(y_train, return_counts=True)
for c, n in zip(unique, counts):
	if not np.isnan(c):
		c = int(c)
	print(f"Clase {c}: {n} muestras ({n/len(y_train)*100:.2f}%)")

In [None]:
n_classes = 5

y_valid = y_train[~np.isnan(y_train)]
y_valid = y_valid[y_valid >= 0].astype(int)

print("Clases en y_valid:", np.unique(y_valid))

present_classes = np.unique(y_valid)
weights_partial = compute_class_weight(
	class_weight="balanced",
	classes=present_classes,
	y=y_valid
)

full_weights = np.zeros(n_classes, dtype=np.float32)
for c, w in zip(present_classes, weights_partial):
	full_weights[int(c)] = float(w)

class_weights = torch.tensor(full_weights, dtype=torch.float32).to(DEVICE)
print("Pesos por clase (0..4):", class_weights.cpu().numpy())

## **6. Augmentaciones para TimeCLR**

In [None]:
class ECGTimeAugment:
	def __init__(
		self,
		series_len: int = 128,
		p_jitter: float = 0.7,		jitter_sigma: float = 0.01,   # ruido leve
		p_scaling: float = 0.6,		scaling_sigma: float = 0.07,  # ganancia leve
		p_tmask: float = 0.4,		tmask_frac=(0.03, 0.08),      # cutout corto
		p_crop: float = 0.6,		crop_frac=(0.7, 0.95),        # recorte moderado
		use_perm: bool = False
	):
		self.T = series_len
		self.p_jitter = p_jitter; self.jitter_sigma = jitter_sigma
		self.p_scaling = p_scaling; self.scaling_sigma = scaling_sigma
		self.p_tmask = p_tmask; self.tmask_frac = tmask_frac
		self.p_crop = p_crop; self.crop_frac = crop_frac
		self.use_perm = use_perm

	def _jitter(self, x):
		y = x + np.random.normal(0.0, self.jitter_sigma, size=x.shape)
		return y.astype(x.dtype, copy=False)

	def _scaling(self, x):
		y = x * np.random.normal(1.0, self.scaling_sigma)
		return y.astype(x.dtype, copy=False)

	def _time_mask(self, x):
		y = x.astype(np.float32, copy=False).copy()
		w = max(1, int(self.T * np.random.uniform(*self.tmask_frac)))
		s = np.random.randint(0, max(1, self.T - w + 1))
		y[s:s+w] = 0.0
		return y.astype(x.dtype, copy=False)

	def _crop(self, x):
		w = max(2, int(self.T * np.random.uniform(*self.crop_frac)))
		s = np.random.randint(0, max(1, self.T - w + 1))
		seg = x[s:s+w]
		i_old = np.linspace(0, 1, num=w)
		i_new = np.linspace(0, 1, num=self.T)
		y = np.interp(i_new, i_old, seg)   # -> float64
		return y.astype(x.dtype, copy=False)

	def __call__(self, x):
		y = x
		if np.random.rand() < self.p_crop:
			y = self._crop(y)
		if np.random.rand() < self.p_tmask:
			y = self._time_mask(y)
		if np.random.rand() < self.p_scaling:
			y = self._scaling(y)
		if np.random.rand() < self.p_jitter:
			y = self._jitter(y)
		return y
	
ecgtime_augment = ECGTimeAugment(series_len=SIGNALS)

In [None]:
aug = ecgtime_augment(X_train[1])
plot_ecg(X=aug, y=y_train[1], aug=True, aug_title="Augmentación")

## **7. Dataset para TimeCLR**

In [None]:
class TimeCLRDataset(Dataset):
	def __init__(self, X, transform=None, eps=1e-6):
		X = np.asarray(X, dtype=np.float32)
		if X.ndim == 2:
			self.X = X
		elif X.ndim == 3 and X.shape[1] == 1:
			self.X = X[:, 0, :]
		else:
			raise ValueError("X debe ser (N, T) o (N, 1, T)")
		self.T = self.X.shape[1]
		self.transform = transform
		self.eps = eps

	def __len__(self): return self.X.shape[0]

	def _z(self, x):
		m, s = x.mean(), x.std()
		return (x - m) / (s + self.eps)

	def __getitem__(self, idx: int):
		x = self.X[idx]
		a1 = self.transform(x) if self.transform else x
		a2 = self.transform(x) if self.transform else x

		# Normaliza y fuerza float32
		a1 = self._z(a1).astype(np.float32, copy=False)
		a2 = self._z(a2).astype(np.float32, copy=False)

		x1 = torch.from_numpy(a1).unsqueeze(0)
		x2 = torch.from_numpy(a2).unsqueeze(0)
		return x1, x2

class LabeledECGDataset(Dataset):
	def __init__(self, X, y, eps=1e-6):
		self.X = np.asarray(X, dtype=np.float32)
		self.y = np.asarray(y, dtype=np.int64)
		self.eps = eps

	def __len__(self): return self.X.shape[0]

	def _z(self, x):
		m, s = x.mean(), x.std()
		return (x - m) / (s + 1e-6)

	def __getitem__(self, i):
		x = torch.from_numpy(self._z(self.X[i])).unsqueeze(0)
		y = torch.tensor(self.y[i])
		return x, y

class UnlabeledECGDataset(Dataset):
	def __init__(self, X, eps=1e-6):
		self.X = np.asarray(X, dtype=np.float32)
		self.eps = eps

	def __len__(self): return self.X.shape[0]

	def _z(self, x):
		m, s = x.mean(), x.std()
		return (x - m) / (s + 1e-6)

	def __getitem__(self, i):
		x = torch.from_numpy(self._z(self.X[i])).unsqueeze(0)
		return x

## **8. Backbone de TimeCLR**

In [None]:
class CNN1DBackbone(nn.Module):
	def __init__(self, in_ch: int = 1, emb_dim: int = 256):
		super().__init__()
		self.feat = nn.Sequential(
			nn.Conv1d(in_ch, 64, kernel_size=7, stride=2, padding=3, bias=False),
			nn.BatchNorm1d(64),
			nn.ReLU(inplace=True),

			nn.Conv1d(64, 128, kernel_size=5, stride=2, padding=2, bias=False),
			nn.BatchNorm1d(128),
			nn.ReLU(inplace=True),

			nn.Conv1d(128, 256, kernel_size=3, stride=2, padding=1, bias=False),
			nn.BatchNorm1d(256),
			nn.ReLU(inplace=True),

			nn.AdaptiveAvgPool1d(1)
		)
		self.fc = nn.Linear(256, emb_dim)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		h = self.feat(x).squeeze(-1)
		z = self.fc(h)
		return z

## **9. ProjectionHead de TimeCLR**

In [None]:
class ProjectionHead(nn.Module):
	def __init__(self, in_dim: int, hid_dim: int = 256, out_dim: int = 128):
		super().__init__()
		self.net = nn.Sequential(
			nn.Linear(in_dim, hid_dim, bias=False),
			nn.BatchNorm1d(hid_dim),
			nn.ReLU(inplace=True),
			nn.Linear(hid_dim, out_dim, bias=True)
		)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		return self.net(x)

## **10. LossFunction para TimeCLR**

In [None]:
def nt_xent_loss(z1: torch.Tensor, z2: torch.Tensor, temperature: float = 0.2) -> torch.Tensor:
	# Normaliza
	z1 = F.normalize(z1, dim=1)
	z2 = F.normalize(z2, dim=1)

	B = z1.size(0)

	Z = torch.cat([z1, z2], dim=0)

	if Z.dtype in (torch.float16, torch.bfloat16):
		Zm = Z.float()
	else:
		Zm = Z

	sim = torch.matmul(Zm, Zm.t()) / temperature

	mask = torch.eye(2 * B, dtype=torch.bool, device=Z.device)
	sim = sim.masked_fill(mask, float('-inf'))

	target = torch.cat([
		torch.arange(B, 2 * B, device=Z.device),
		torch.arange(0, B, device=Z.device)
	], dim=0)

	loss = F.cross_entropy(sim, target)
	return loss

## **11. Modelo de TimeCLR**

In [None]:
class TimeCLRModel(pl.LightningModule):
	def __init__(
		self,
		emb_dim: int = EMBEDDING_DIM,
		proj_hid: int = PROJ_HID,
		proj_out: int = PROJ_OUT,
		temperature: float = TEMP,
		lr: float = LR_SSL,
		weight_decay: float = WD
	):
		super().__init__()
		self.save_hyperparameters()
		self.encoder = CNN1DBackbone(in_ch=1, emb_dim=emb_dim)
		self.projector = ProjectionHead(emb_dim, proj_hid, proj_out)
		self.temperature = temperature
		self.lr = lr
		self.weight_decay = weight_decay

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		h = self.encoder(x)
		z = self.projector(h)
		return z

	def training_step(self, batch, batch_idx):
		x1, x2 = batch
		z1 = self(x1)
		z2 = self(x2)
		loss = nt_xent_loss(z1, z2, self.temperature)
		self.log("train_loss", loss, prog_bar=True, on_step=True, on_epoch=True)
		return loss

	def configure_optimizers(self):
		opt = torch.optim.AdamW(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)

		steps_per_epoch = len(self.trainer.datamodule.train_dataloader()) if self.trainer.datamodule else len(self.trainer.fit_loop._data_source.dataloader())
		total_steps = self.trainer.max_epochs * steps_per_epoch
		warmup_steps = int(0.1 * total_steps)

		def warmup_then_cosine(step):
			if step < warmup_steps:
				return step / max(1, warmup_steps)
			progress = (step - warmup_steps) / max(1, total_steps - warmup_steps)
			return 0.5 * (1 + math.cos(math.pi * progress))

		scheduler = LambdaLR(opt, lr_lambda=warmup_then_cosine)

		return {"optimizer": opt, "lr_scheduler": {"scheduler": scheduler, "interval": "step"}}

	@torch.no_grad()
	def encode(self, x: torch.Tensor, normalize: bool = True) -> torch.Tensor:
		h = self.encoder(x)
		if normalize:
			h = F.normalize(h, dim=1)
		return h

## **12. Pre-Entrenamiento SimCLR**

In [None]:
X_all_train = X_train
ssl_ds = TimeCLRDataset(
	X=X_all_train,
	transform=ecgtime_augment
)
ssl_dl = DataLoader(
	ssl_ds,
	batch_size=BATCH_SSL,
	shuffle=True,
	num_workers=NUM_WORKERS,
	pin_memory=True
)

timeclr_model = TimeCLRModel(
	emb_dim=EMBEDDING_DIM,
	proj_hid=PROJ_HID,
	proj_out=PROJ_OUT,
	temperature=TEMP,
	lr=LR_SSL,
	weight_decay=WD
)

steps_per_epoch = max(1, len(ssl_dl))
warmup_steps = int(0.1 * EPOCHS_SSL * steps_per_epoch)
lr_monitor = pl.callbacks.LearningRateMonitor(logging_interval="step")

trainer = pl.Trainer(
	max_epochs=EPOCHS_SSL,
	accelerator="gpu" if torch.cuda.is_available() else "cpu",
	devices=1,
	precision="16-mixed" if torch.cuda.is_available() else 32,
	benchmark=True,
	log_every_n_steps=10,
	gradient_clip_val=1.0,
	callbacks=[lr_monitor]
)
trainer.fit(timeclr_model, ssl_dl)

encoder = timeclr_model.encoder.eval()

## **13. LinearProbe**

In [None]:
train_ds = LabeledECGDataset(X_lab_train, y_lab_train)
test_ds  = LabeledECGDataset(X_test, y_test.astype(int))

train_dl = DataLoader(
	train_ds,
	batch_size=BATCH_LP,
	shuffle=True,
	num_workers=NUM_WORKERS
)
test_dl = DataLoader(
	test_ds,
	batch_size=BATCH_LP,
	shuffle=False,
	num_workers=NUM_WORKERS
)

class LinearProbe(pl.LightningModule):
	def __init__(self, encoder, n_classes: int, lr: float = LR_LP, wd: float = WD):
		super().__init__()
		self.encoder = encoder
		for p in self.encoder.parameters():
			p.requires_grad = False
		self.head = nn.Linear(256, n_classes)
		self.lr, self.wd = lr, wd

	def forward(self, x):
		with torch.no_grad():
			h = self.encoder(x)
		return self.head(h)

	def training_step(self, batch, _):
		x, y = batch
		logits = self(x)
		loss = F.cross_entropy(logits, y, weight=class_weights)
		self.log("lp_train_loss", loss, prog_bar=True)
		return loss

	def test_step(self, batch, _):
		x, y = batch
		logits = self(x)
		pred = logits.argmax(1)
		acc = (pred == y).float().mean()
		self.log("lp_test_acc", acc, prog_bar=True)

	def configure_optimizers(self):
		return torch.optim.AdamW(self.head.parameters(), lr=self.lr, weight_decay=self.wd)

lp = LinearProbe(
	encoder,
	n_classes=num_classes,
	lr=LR_LP, 
	wd=WD
)
trainer = pl.Trainer(
	max_epochs=EPOCHS_LP,
	accelerator="gpu" if torch.cuda.is_available() else "cpu",
	devices=1
)
trainer.fit(lp, train_dl)
trainer.test(lp, test_dl)

## **14. FineTuning TimeCLR**

In [None]:
class FineTune(pl.LightningModule):
	def __init__(self, encoder, n_classes: int, lr_enc=LR_FT_ENC, lr_head=LR_FT_HEAD, wd=WD):
		super().__init__()
		self.encoder = encoder
		self.head = nn.Linear(256, n_classes)
		self.lr_enc, self.lr_head, self.wd = lr_enc, lr_head, wd

	def forward(self, x):
		h = self.encoder(x)
		return self.head(h)

	def training_step(self, batch, _):
		x, y = batch
		logits = self(x)
		loss = F.cross_entropy(logits, y, weight=class_weights)
		self.log("ft_train_loss", loss, prog_bar=True)
		return loss

	def validation_step(self, batch, _):
		x, y = batch
		logits = self(x)
		pred = logits.argmax(1)
		acc = (pred == y).float().mean()
		self.log("ft_val_acc", acc, prog_bar=True)

	def test_step(self, batch, _):
		x, y = batch
		logits = self(x)
		pred = logits.argmax(1)
		acc = (pred == y).float().mean()
		self.log("ft_test_acc", acc, prog_bar=True)

	def configure_optimizers(self):
		params = [
			{"params": self.encoder.parameters(), "lr": self.lr_enc, "weight_decay": self.wd},
			{"params": self.head.parameters(),    "lr": self.lr_head, "weight_decay": self.wd},
		]
		opt = torch.optim.AdamW(params)
		sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=EPOCHS_FT)
		return {"optimizer": opt, "lr_scheduler": sch}

from sklearn.model_selection import train_test_split
X_tr, X_val, y_tr, y_val = train_test_split(X_lab_train, y_lab_train, test_size=0.2, stratify=y_lab_train, random_state=42)

dl_tr = DataLoader(
	LabeledECGDataset(X_tr, y_tr),
	batch_size=BATCH_FT,
	shuffle=True,
	num_workers=NUM_WORKERS
)
dl_val = DataLoader(
	LabeledECGDataset(X_val, y_val),
	batch_size=BATCH_FT,
	shuffle=False,
	num_workers=NUM_WORKERS
)
dl_te = DataLoader(
	test_ds,
	batch_size=BATCH_FT,
	shuffle=False,
	num_workers=NUM_WORKERS
)

early_stop = pl.callbacks.EarlyStopping(monitor="ft_val_acc", mode="max", patience=10)
ckpt = pl.callbacks.ModelCheckpoint(monitor="ft_val_acc", mode="max", save_top_k=1)

ft = FineTune(encoder, n_classes=num_classes)
trainer = pl.Trainer(
	max_epochs=EPOCHS_FT,
	accelerator="gpu" if torch.cuda.is_available() else "cpu",
	devices=1,
	callbacks=[early_stop, ckpt]
)
trainer.fit(ft, dl_tr, dl_val)
trainer.test(ft, dl_te)	

## **15. Evaluación del Modelo**

In [None]:
@torch.no_grad()
def evaluate_all_metrics(
	model: torch.nn.Module,
	dataloader,
	num_classes: int,
	class_names: Optional[List[str]] = None,
	device: Optional[torch.device] = None,
	normalize_cm: bool = True
) -> Dict[str, object]:
	model.eval()
	if device is None:
		device = next(model.parameters()).device

	all_preds = []
	all_probs = []
	all_true  = []

	for batch in dataloader:
		if isinstance(batch, (list, tuple)) and len(batch) == 2:
			x, y = batch
		else:
			raise ValueError("Dataloader debe entregar (x, y).")

		x = x.to(device)
		y = y.to(device)

		logits = model(x)
		probs  = F.softmax(logits, dim=1)
		preds  = probs.argmax(1)

		all_true.append(y.cpu().numpy())
		all_preds.append(preds.cpu().numpy())
		all_probs.append(probs.cpu().numpy())

	y_true = np.concatenate(all_true, axis=0)
	y_pred = np.concatenate(all_preds, axis=0)
	y_prob = np.concatenate(all_probs, axis=0)

	acc     = accuracy_score(y_true, y_pred)
	f1_mac  = f1_score(y_true, y_pred, average="macro", zero_division=0)
	rec_per_class = recall_score(y_true, y_pred, average=None, labels=np.arange(num_classes), zero_division=0)

	target_names = class_names if (class_names is not None and len(class_names) == num_classes) else None
	cls_report = classification_report(
		y_true, y_pred,
		target_names=target_names,
		zero_division=0,
		digits=4
	)

	cm = confusion_matrix(y_true, y_pred, labels=np.arange(num_classes))
	if normalize_cm:
		with np.errstate(all="ignore"):
			cm_norm = cm / cm.sum(axis=1, keepdims=True)
			cm_norm = np.nan_to_num(cm_norm)
	else:
		cm_norm = None

	roc_auc_macro = None
	try:
		unique_test_classes = np.unique(y_true)
		if num_classes == 2:
			if set(unique_test_classes.tolist()) == {0, 1}:
				roc_auc_macro = roc_auc_score(y_true, y_prob[:, 1])
			else:
				classes_sorted = np.sort(unique_test_classes)
				mapper = {c:i for i, c in enumerate(classes_sorted)}
				y_true_bin = np.vectorize(mapper.get)(y_true)
				pos_index = np.where(classes_sorted == classes_sorted.max())[0][0]
				roc_auc_macro = roc_auc_score(y_true_bin, y_prob[:, pos_index])
		else:
			if len(unique_test_classes) >= 2:
				roc_auc_macro = roc_auc_score(y_true, y_prob, average="macro", multi_class="ovr")
			else:
				roc_auc_macro = None
	except Exception as e:
		roc_auc_macro = None

	results = {
		"f1_macro": f1_mac,
		"accuracy": acc,
		"recall_per_class": rec_per_class,
		"confusion_matrix": cm,
		"confusion_matrix_normalized": cm_norm,
		"roc_auc_macro": roc_auc_macro,
		"classification_report": cls_report,
	}

	# Impresión amigable
	print("\n=== Evaluación ===")
	print(f"F1-macro: {f1_mac:.4f}")
	print(f"Accuracy: {acc:.4f}")
	if roc_auc_macro is not None:
		print(f"ROC-AUC macro (OVR): {roc_auc_macro:.4f}")
	else:
		print("ROC-AUC macro: no disponible (clases ausentes o caso no válido).")

	print("\nRecall por clase:")
	if target_names is not None:
		for i, r in enumerate(rec_per_class):
			print(f"  {target_names[i]}: {r:.4f}")
	else:
		for i, r in enumerate(rec_per_class):
			print(f"  clase {i}: {r:.4f}")

	print("\nReporte por clase:\n", cls_report)
	print("Matriz de confusión (cruda):\n", cm)
	if cm_norm is not None:
		print("Matriz de confusión (normalizada por fila):\n", np.round(cm_norm, 4))

	return results

In [None]:
num_classes = int(np.unique(y_train[~np.isnan(y_train).astype(bool)]).size) if hasattr(y_train, "dtype") else int(np.unique(y_train).size)

class_names = [
	"(N) Latido normal",
	"(S) Latido supraventricular",
	"(V) Latido ventricular ectópico",
	"(F) Latido de fusión",
	"(Q) Latido desconocido"
]

results = evaluate_all_metrics(
	model=ft,
	dataloader=test_dl,
	num_classes=num_classes,
	class_names=class_names
)

## **16 Escritura del CSV**

In [None]:
@torch.no_grad()
def write_predictions_csv(model, test_dl, out_path="preds.csv"):
	model.eval()
	device = next(model.parameters()).device

	ids = []
	preds = []
	running_id = 0

	for batch in test_dl:
		if isinstance(batch, (list, tuple)) and len(batch) >= 1:
			x = batch[0]
		else:
			x = batch

		bsz = x.size(0)
		x = x.to(device)

		logits = model(x)
		p = F.softmax(logits, dim=1)
		yhat = p.argmax(1).cpu().numpy()

		ids.extend(range(running_id, running_id + bsz))
		preds.extend(yhat.tolist())
		running_id += bsz

	df = pd.DataFrame({"ID": ids, "label": preds})
	df.to_csv(out_path, index=False)
	print(f"CSV de predicciones guardado en: {out_path}")

write_predictions_csv(
	ft,
	test_dl,
	out_path=f"timeclr_predictions_{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.csv"
)