<a href="https://colab.research.google.com/github/Zfeng0207/FIT3199-FYP/blob/dev%2Fzfeng/multi_label_lstm_classifier.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Starting the Notebook



## Loading Dependencies

In [30]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [31]:
import os
os.chdir('/content/drive/MyDrive/Colab Notebooks/ECG-MIMIC-main')

In [32]:
# !pip install -qqqq mlflow torchmetrics pytorch_lightning iterative-stratification

In [33]:
import mlflow

# Medical Multi-Label Dataset Preparation

In [37]:
def encode_label(label: list, num_classes=10):
    """This functions converts labels into one-hot encoding"""

    target = torch.zeros(num_classes)
    for l in str(label).split(","):
        target[int(l)] = 1.0
    return target

# Configuration Classes

In [38]:
from dataclasses import dataclass, asdict

@dataclass
class DatasetConfig:
    """Configuration for the dataset."""
    memmap_meta_path: str = "src/data/memmap/memmap_meta.npz"
    memmap_path: str = "src/data/memmap/memmap.npy"
    df_diag_path: str = "src/data/records_w_diag_icd10.csv"
    df_memmap_pkl_path: str = "src/data/memmap/df_memmap.pkl"
    df_labels_path: str = "src/data/label_df.csv"

@dataclass
class TrainingConfig:
    """Configuration for training."""
    batch_size: int = 32
    learning_rate: float = 1e-3
    epochs: int = 10
    model_type: str = "RNNAttentionModel"  # or "CNN", "LSTM", etc.
    # Add other training-related parameters as needed

In [39]:
dataset_config = DatasetConfig()
training_config = TrainingConfig(batch_size=64, epochs=20)

# ECG Initialization

### ECG Dataset

In [70]:
import torch
from torch.utils.data import Dataset

class ECGDataset(Dataset):
    def __init__(self, memmap, X, y):
        self.df = X.reset_index(drop=True)
        self.memmap = memmap
        self.y = y

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
      row = self.df.loc[idx]
      start = row['start']
      length = row['length']
      signal = self.memmap[start : start + length * 12]  # 12 features per timestep
      signal = (signal - signal.mean(axis=0)) / (signal.std(axis=0) + 1e-6)
      signal = signal.reshape(length, 12)
      label = row['res']
      # Convert signal to PyTorch tensor before checking for NaN/inf
      signal = torch.tensor(signal, dtype=torch.float32)
      # if torch.isnan(signal).any() or torch.isinf(signal).any():
      #   return None
      label = np.fromstring(label[1:-1], sep=' ', dtype=np.float64)  # Removing brackets and converting to float64 or int as needed
      label = torch.tensor(label, dtype=torch.long)  # Now convert to PyTorch tensor

      return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor


### ECG Data Module

In [76]:
from torch.utils.data import DataLoader
import pytorch_lightning as pl

class ECGDataModule(pl.LightningDataModule):
    def __init__(self, memmap, X_train, y_train, X_val, y_val, X_test, y_test, batch_size=32):
        super().__init__()
        self.memmap = memmap
        self.X_train = X_train
        self.y_train = y_train
        self.X_val = X_val
        self.y_val = y_val
        self.X_test = X_test
        self.y_test = y_test
        self.batch_size = batch_size

    def setup(self, stage=None):
        self.train_dataset = ECGDataset(self.memmap, self.X_train, self.y_train)
        self.val_dataset = ECGDataset(self.memmap, self.X_val, self.y_val)
        self.test_dataset = ECGDataset(self.memmap, self.X_test, self.y_test)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=11, pin_memory=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size,  num_workers=11, pin_memory=True)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, num_workers=11, pin_memory=True)


## Swish

In [43]:
import pytorch_lightning as pl
import torch

class Swish(pl.LightningModule):
    def __init__(self):
        super().__init__()

    def forward(self, x):
        return x * torch.sigmoid(x)

## ConvNormPool

In [44]:
class ConvNormPool(pl.LightningModule):
    """Conv Skip-connection module"""
    def __init__(
        self,
        input_size,
        hidden_size,
        kernel_size,
        norm_type='bachnorm'
    ):
        super().__init__()

        self.kernel_size = kernel_size
        self.conv_1 = nn.Conv1d(
            in_channels=input_size,
            out_channels=hidden_size,
            kernel_size=kernel_size
        )
        self.conv_2 = nn.Conv1d(
            in_channels=hidden_size,
            out_channels=hidden_size,
            kernel_size=kernel_size
        )
        self.conv_3 = nn.Conv1d(
            in_channels=hidden_size,
            out_channels=hidden_size,
            kernel_size=kernel_size
        )
        self.swish_1 = Swish()
        self.swish_2 = Swish()
        self.swish_3 = Swish()
        if norm_type == 'group':
            self.normalization_1 = nn.GroupNorm(
                num_groups=8,
                num_channels=hidden_size
            )
            self.normalization_2 = nn.GroupNorm(
                num_groups=8,
                num_channels=hidden_size
            )
            self.normalization_3 = nn.GroupNorm(
                num_groups=8,
                num_channels=hidden_size
            )
        else:
            self.normalization_1 = nn.BatchNorm1d(num_features=hidden_size)
            self.normalization_2 = nn.BatchNorm1d(num_features=hidden_size)
            self.normalization_3 = nn.BatchNorm1d(num_features=hidden_size)

        self.pool = nn.MaxPool1d(kernel_size=2)

    def forward(self, input):
        conv1 = self.conv_1(input)
        x = self.normalization_1(conv1)
        x = self.swish_1(x)
        x = F.pad(x, pad=(self.kernel_size - 1, 0))

        x = self.conv_2(x)
        x = self.normalization_2(x)
        x = self.swish_2(x)
        x = F.pad(x, pad=(self.kernel_size - 1, 0))

        conv3 = self.conv_3(x)
        x = self.normalization_3(conv1+conv3)
        x = self.swish_3(x)
        x = F.pad(x, pad=(self.kernel_size - 1, 0))

        x = self.pool(x)
        return x


# CNN

In [45]:
class CNN(pl.LightningModule):
    def __init__(
        self,
        input_size = 1,
        hid_size = 256,
        kernel_size = 5,
        num_classes = 5,
    ):

        super().__init__()

        self.conv1 = ConvNormPool(
            input_size=input_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )
        self.conv2 = ConvNormPool(
            input_size=hid_size,
            hidden_size=hid_size//2,
            kernel_size=kernel_size,
        )
        self.conv3 = ConvNormPool(
            input_size=hid_size//2,
            hidden_size=hid_size//4,
            kernel_size=kernel_size,
        )
        self.avgpool = nn.AdaptiveAvgPool1d((1))
        self.fc = nn.Linear(in_features=hid_size//4, out_features=num_classes)

    def forward(self, input):
        x = self.conv1(input)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.avgpool(x)
        # print(x.shape) # num_features * num_channels
        x = x.view(-1, x.size(1) * x.size(2))
        x = F.softmax(self.fc(x), dim=1)
        return x


# RNN

In [46]:
class RNN(pl.LightningModule):
    """RNN module(cell type lstm or gru)"""
    def __init__(
        self,
        input_size,
        hid_size,
        num_rnn_layers=1,
        dropout_p = 0.2,
        bidirectional = False,
        rnn_type = 'lstm',
    ):
        super().__init__()

        if rnn_type == 'lstm':
            self.rnn_layer = nn.LSTM(
                input_size=input_size,
                hidden_size=hid_size,
                num_layers=num_rnn_layers,
                dropout=dropout_p if num_rnn_layers>1 else 0,
                bidirectional=bidirectional,
                batch_first=True,
            )

        else:
            self.rnn_layer = nn.GRU(
                input_size=input_size,
                hidden_size=hid_size,
                num_layers=num_rnn_layers,
                dropout=dropout_p if num_rnn_layers>1 else 0,
                bidirectional=bidirectional,
                batch_first=True,
            )
    def forward(self, input):
        outputs, hidden_states = self.rnn_layer(input)
        return outputs, hidden_states


# RNN Model

In [47]:
class RNNModel(pl.LightningModule):
    def __init__(
        self,
        input_size,
        hid_size,
        rnn_type,
        bidirectional,
        n_classes=5,
        kernel_size=5,
    ):
        super().__init__()

        self.rnn_layer = RNN(
            input_size=46,#hid_size * 2 if bidirectional else hid_size,
            hid_size=hid_size,
            rnn_type=rnn_type,
            bidirectional=bidirectional
        )
        self.conv1 = ConvNormPool(
            input_size=input_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )
        self.conv2 = ConvNormPool(
            input_size=hid_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )
        self.avgpool = nn.AdaptiveAvgPool1d((1))
        self.fc = nn.Linear(in_features=hid_size, out_features=n_classes)

    def forward(self, input):
        x = self.conv1(input)
        x = self.conv2(x)
        x, _ = self.rnn_layer(x)
        x = self.avgpool(x)
        x = x.view(-1, x.size(1) * x.size(2))
        x = F.sigmoid(self.fc(x), dim=1)#.squeeze(1)
        return x


# RNN Attention Model

In [92]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from torchmetrics.classification import MultilabelAccuracy, MultilabelF1Score, MultilabelAUROC

class RNNAttentionModel(pl.LightningModule):
    def __init__(
        self,
        hid_size =64,
        rnn_type = 'lstm',
        bidirectional=False,
        num_classes=12,
        input_size =12,
        kernel_size=5,
        lr=1e-3,
        f1_metric_threshold=0.5,
    ):
        super().__init__()
        self.save_hyperparameters()

        self.conv1 = ConvNormPool(
            input_size=input_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )
        self.conv2 = ConvNormPool(
            input_size=hid_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )

        self.rnn_layer = RNN(
            input_size=hid_size,
            hid_size=hid_size,
            rnn_type=rnn_type,
            bidirectional=bidirectional
        )

        self.attn = nn.Linear(hid_size, hid_size, bias=False)
        self.fc = nn.Linear(in_features=hid_size, out_features=num_classes)  # Multi-label output
        self.loss_fn = nn.BCEWithLogitsLoss()
        self.lr = lr

        # Metrics
        self.train_acc = MultilabelAccuracy(num_labels=num_classes, threshold=f1_metric_threshold)
        self.train_f1 = MultilabelF1Score(num_labels=num_classes, average="macro", threshold=f1_metric_threshold)
        self.train_auc = MultilabelAUROC(num_labels=num_classes)

        self.val_acc = MultilabelAccuracy(num_labels=num_classes, threshold=f1_metric_threshold)
        self.val_f1 = MultilabelF1Score(num_labels=num_classes, average="macro", threshold=f1_metric_threshold)
        self.val_auc = MultilabelAUROC(num_labels=num_classes)

    def forward(self, input):
        input = input.permute(0, 2, 1)
        x = self.conv1(input)
        x = self.conv2(x)
        x = x.permute(0, 2, 1)

        x_out, _ = self.rnn_layer(x)

        attn_weights = torch.softmax(self.attn(x_out), dim=1)
        x = torch.sum(attn_weights * x_out, dim=1)

        logits = self.fc(x)
        return logits

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y.float())
        probs = torch.sigmoid(logits)

        acc = self.train_acc(probs, y.int())
        f1 = self.train_f1(probs, y.int())
        auc = self.train_auc(probs, y.int())

        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", acc, prog_bar=True)
        self.log("train_f1", f1, prog_bar=True)
        self.log("train_auc", auc, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y.float())
        probs = torch.sigmoid(logits)

        acc = self.val_acc(probs, y.int())
        f1 = self.val_f1(probs, y.int())
        auc = self.val_auc(probs, y.int())

        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)
        self.log("val_f1", f1, prog_bar=True)
        self.log("val_auc", auc, prog_bar=True)

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr)

In [49]:
## Safe collate

In [50]:
# from torch.nn.utils.rnn import pad_sequence
# def safe_collate(batch):
#     # Filter out None entries
#     batch = [b for b in batch if b is not None]
#     if len(batch) == 0:
#         return None  # Skip entire batch if empty (optional, or raise Exception)

#     signals, labels = zip(*batch)
#     signals = pad_sequence(signals, batch_first=True)  # if variable-length ECG
#     labels = torch.tensor(labels)
#     return signals, labels


# Simple LSTM Model

In [51]:
import torch
import torch.nn as nn
import pytorch_lightning as pl
import torch.nn.functional as F
from torchmetrics.classification import BinaryF1Score, BinaryAUROC


class LSTMClassifier(pl.LightningModule):
    def __init__(self, input_size=12, hidden_size=64, num_layers=2, lr=1e-3):
        super().__init__()
        self.save_hyperparameters()

        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            bidirectional=True
        )

        self.train_f1 = BinaryF1Score()
        self.val_f1 = BinaryF1Score()
        self.test_f1 = BinaryF1Score()

        self.train_auc = BinaryAUROC()
        self.val_auc = BinaryAUROC()
        self.test_auc = BinaryAUROC()

        self.fc = nn.Linear(hidden_size * 2, 1)  # bidirectional
        self.loss_fn = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([pos_weight]).to(self.device))

    def forward(self, x):
        # x: (B, T, C) → needs to be (B, T, 12)
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # take last timestep
        logits = self.fc(out)
        return logits.squeeze()

    def on_train_start(self):
      # Log model type as a parameter or tag
      mlflow.pytorch.log_model(self, "model") # Registers the model
      mlflow.log_param("model_type", "LSTM")  # Log as parameter
      mlflow.set_tag("model_type", "LSTM")

    def training_step(self, batch, batch_idx):
        self.train()
        x, y = batch
        x = x.to(self.device)
        y = y.to(self.device)
        logits = self(x)
        loss = self.loss_fn(logits, y.float())
        probs = torch.sigmoid(logits)
        preds = probs > 0.5
        auc = self.train_auc(probs, y.int())
        acc = (preds == y).float().mean()
        f1 = self.train_f1(preds, y)
        self.log("train_loss", loss, prog_bar=True)
        self.log("train_acc", acc, prog_bar=True)
        self.log("train_f1", f1, prog_bar=True)
        self.log("train_auc", auc, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y.float())
        probs = torch.sigmoid(logits)
        preds = probs > 0.5
        auc = self.train_auc(probs, y.int())
        acc = (preds == y).float().mean()
        f1 = self.val_f1(preds, y)

        self.log("val_loss", loss, prog_bar=True)
        self.log("val_acc", acc, prog_bar=True)
        self.log("val_f1", f1, prog_bar=True)
        self.log("val_auc", auc, prog_bar=True)

        return loss


    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = self.loss_fn(logits, y.float())
        probs = torch.sigmoid(logits)
        preds = probs > 0.5
        auc = self.train_auc(probs, y.int())

        acc = (preds == y).float().mean()
        f1 = self.test_f1(preds, y)

        self.log("test_loss", loss, prog_bar=True)
        self.log("test_acc", acc, prog_bar=True)
        self.log("test_f1", f1, prog_bar=True)
        self.log("test_auc", auc, prog_bar=True)

        return loss

    # def configure_gradient_clipping(
    #     self,
    #     optimizer=None,
    #     optimizer_idx=None,
    #     gradient_clip_val=None,
    #     gradient_clip_algorithm=None
    # ):
    #     if optimizer is not None:
    #         torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0)



    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=1e-4)  # Reduced lr


## Data Sampling

In [53]:
# Step 1: Load your full dataframe
df = df_labels.copy()

# Step 2: Prepare your multi-hot label matrix (columns with labels only)
label_cols = ["res"]  # your real label columns
Y_wrong = df[label_cols].values
Y = np.vstack([np.fromstring(row[0][1:-1], sep=' ') for row in Y_wrong])


In [54]:
Y

array([[0., 0., 0., ..., 0., 1., 0.],
       [0., 0., 0., ..., 1., 0., 0.],
       [0., 1., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

In [55]:
import pandas as pd
import numpy as np
from iterstrat.ml_stratifiers import MultilabelStratifiedKFold, MultilabelStratifiedShuffleSplit

# Step 1: Load your full dataframe
df = df_labels.copy()

# Step 2: Prepare your multi-hot label matrix (columns with labels only)
label_cols = ["res"]  # your real label columns
Y_wrong = df[label_cols].values
Y = np.vstack([np.fromstring(row[0][1:-1], sep=' ') for row in Y_wrong])

# Step 3: Initialize splitter
splitter = MultilabelStratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

# Step 4: First split into train_val and test
train_val_idx, test_idx = next(splitter.split(df, Y))

df_train_val = df.iloc[train_val_idx].reset_index(drop=True)
df_test = df.iloc[test_idx].reset_index(drop=True)

# Step 5: Now split train_val into train and val
splitter_val = MultilabelStratifiedShuffleSplit(n_splits=1, test_size=0.1, random_state=42)  # 10% of train_val for val

# **Change here**: Use the same logic as creating Y for Y_train_val
Y_train_val_wrong = df_train_val[label_cols].values  # Get the "res" column values
Y_train_val = np.vstack([np.fromstring(row[0][1:-1], sep=' ', dtype=np.float64) for row in Y_train_val_wrong])  # Make multi-hot like you did for Y


train_idx, val_idx = next(splitter_val.split(df_train_val, Y_train_val))

df_train = df_train_val.iloc[train_idx].reset_index(drop=True)
df_val = df_train_val.iloc[val_idx].reset_index(drop=True)

# Step 6: Result
print("Train shape:", df_train.shape)
print("Validation shape:", df_val.shape)
print("Test shape:", df_test.shape)

# If you want to pass to Dataset later:
X_train, y_train = df_train, df_train[label_cols].values
X_val, y_val = df_val, df_val[label_cols].values
X_test, y_test = df_test, df_test[label_cols].values

Train shape: (214740, 12)
Validation shape: (23861, 12)
Test shape: (59651, 12)


In [57]:
# from sklearn.utils.class_weight import compute_class_weight

# labels = y_train
# class_weights = compute_class_weight(class_weight='balanced', classes=np.array([0, 1]), y=labels)

# # class_weights[1] is the weight for positive class
# pos_weight = class_weights[1] / class_weights[0]  # Convert to ratio


Model and data Initialization

In [61]:
memmap_data = np.memmap(memmap_path, dtype=np.float32, mode='r')

In [81]:
# Create the data module
ecg_dm = ECGDataModule(
    memmap=memmap_data,
    X_train = X_train,
    X_val = X_val,
    X_test = X_test,
    y_train = y_train,
    y_val = y_val,
    y_test = y_test,
    batch_size=64
)

## Setting up Mlflow for model baseline tracking

# Model Training

In [65]:
# from pytorch_lightning import Trainer
# from pytorch_lightning.loggers import MLFlowLogger
# import mlflow

# import mlflow
# import mlflow.pytorch
# from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
# import os

# os.environ['MLFLOW_TRACKING_USERNAME'] = "Zfeng0207"
# os.environ['MLFLOW_TRACKING_PASSWORD'] = "af7c8365aec4d3ff7a40563a35ec94d4bc9b4512"
# os.environ['MLFLOW_TRACKING_PROJECTNAME'] = "stroke-prediction-dagshub-repo"
# # Setup
# experiment_name = "lstm-ecg"
# tracking_uri = f"https://dagshub.com/{os.environ['MLFLOW_TRACKING_USERNAME']}/{os.environ['MLFLOW_TRACKING_PROJECTNAME']}.mlflow"

# mlflow.set_tracking_uri(tracking_uri)
# mlflow.set_experiment(experiment_name)

# print(f"MLflow tracking experiment name: {experiment_name}")
# print(f"Tracking URI: {tracking_uri}")

# # Use same URI in logger
# mlf_logger = MLFlowLogger(
#     experiment_name=experiment_name,
#     tracking_uri=tracking_uri,
#     log_model=True
# )

In [93]:
model = RNNAttentionModel()
# model = RNNModel(12, 64, 'lstm', True)
# model = CNN(num_classes=5, hid_size=128)
# model = LSTMClassifier(input_size=12)

In [94]:
from pytorch_lightning.callbacks import ModelCheckpoint

model_checkpoint = ModelCheckpoint(
    monitor="valid/f1",
    mode="max",
    filename="ckpt_{epoch:03d}-vloss_{valid/loss:.4f}_vf1_{valid/f1:.4f}",
    auto_insert_metric_name=False,
)


# Update the Trainer to use the callback
trainer = pl.Trainer(
    # logger=mlf_logger,
    max_epochs=1,
    gradient_clip_val=1.0,
    # callbacks=[checkpoint_callback]  # Add callback here
)

INFO:pytorch_lightning.utilities.rank_zero:Using default `ModelCheckpoint`. Consider installing `litmodels` package to enable `LitModelCheckpoint` for automatic upload to the Lightning model registry.
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [95]:
# ... (your existing code) ...

trainer.fit(model, datamodule=ecg_dm)
trainer.validate(model, datamodule=ecg_dm)  # Add this line to run validation
trainer.test(model, datamodule=ecg_dm)

# ... (the rest of your code) ...

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
   | Name      | Type               | Params | Mode 
----------------------------------------------------------
0  | conv1     | ConvNormPool       | 45.4 K | train
1  | conv2     | ConvNormPool       | 62.0 K | train
2  | rnn_layer | RNN                | 33.3 K | train
3  | attn      | Linear             | 4.1 K  | train
4  | fc        | Linear             | 780    | train
5  | loss_fn   | BCEWithLogitsLoss  | 0      | train
6  | train_acc | MultilabelAccuracy | 0      | train
7  | train_f1  | MultilabelF1Score  | 0      | train
8  | train_auc | MultilabelAUROC    | 0      | train
9  | val_acc   | MultilabelAccuracy | 0      | train
10 | val_f1    | MultilabelF1Score  | 0      | train
11 | val_auc   | MultilabelAUROC    | 0      | train
----------------------------------------------------------
145 K     Trainable params
0         Non-trainable params
145 

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a tensor
  return signal, torch.tensor(label, dtype=torch.long) # signal is already a

Training: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:
Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined

In [None]:
trainer.test(model, datamodule=ecg_dm)

## Evaluation Metrics

# Model Testing

In [None]:
from sklearn.metrics import (
    confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc,
    accuracy_score, f1_score, roc_auc_score
)
import matplotlib.pyplot as plt

# After test finishes
y_true = model.all_targets.numpy()
y_pred = model.all_preds.numpy()
y_prob = model.all_probs.numpy()

# Confusion Matrix
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap='Blues')
plt.title("Confusion Matrix")
plt.show()

# ROC Curve
fpr, tpr, _ = roc_curve(y_true, y_prob)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, label=f"ROC curve (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], linestyle='--', color='gray')
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic (ROC)")
plt.legend()
plt.grid(True)
plt.show()

# Metrics
acc = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)
roc_auc_score_val = roc_auc_score(y_true, y_prob)

print(f"Accuracy: {acc:.4f}")
print(f"F1 Score: {f1:.4f}")
print(f"AUC-ROC Score: {roc_auc_score_val:.4f}")


In [None]:
import pandas as pd

# Create lists to hold signals and labels
signals = []
labels = []

for idx in range(len(df)):
    start = df.loc[idx, 'start']
    length = df.loc[idx, 'length']

    # Get and normalize the signal
    raw = memmap_data[start : start + length * 12]
    normed = (raw - raw.mean()) / (raw.std() + 1e-6)

    if np.isnan(normed).any() or np.isinf(normed).any():
        continue  # skip bad sample

    signal = normed.reshape(length, 12)
    signals.append(signal)
    labels.append(df.loc[idx, 'Stroke_YN'])

# Create DataFrame
df_signals = pd.DataFrame({
    "signal": signals,  # Each row is a 2D numpy array (object dtype)
    "label": labels
})