### Mount Google Drive

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Change Directory
%cd '/content/drive/My Drive/dsp/project'

/content/drive/My Drive/dsp/project


### Install Required Libraries


In [3]:
pip install tensorboardX

Collecting tensorboardX
  Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl.metadata (5.8 kB)
Downloading tensorboardX-2.6.2.2-py2.py3-none-any.whl (101 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/101.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m101.7/101.7 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tensorboardX
Successfully installed tensorboardX-2.6.2.2


In [9]:
import argparse
import os
import time
import torch
import numpy as np
from tensorboardX import SummaryWriter
import torch.nn as nn
import torch.nn.functional as F
import os
import argparse
import torch
import time
import numpy as np
from sklearn import preprocessing
from imblearn.over_sampling import RandomOverSampler
from torch.utils.data import DataLoader
import random
from sklearn.metrics import f1_score

### Set Training Parameters


In [5]:
class Options:
    def __init__(self):
        pass

    def init(self, parser):
        # Global settings
        parser.add_argument('--batch_size', type=int, default=256,
                            help='Batch size for training and validation.')
        parser.add_argument('--nepoch', type=int, default=50,
                            help='Number of training epochs.')
        parser.add_argument('--lr_initial', type=float, default=1e-4,
                            help='Initial learning rate for the optimizer.')
        parser.add_argument('--decay_epoch', type=int, default=20,
                            help='Epoch at which to start decaying the learning rate.')

        # Device settings
        parser.add_argument('--device', type=str, default='cuda',
                            help='Device to use for training ("cuda" for GPU, "cpu" for CPU).')

        # Model settings
        parser.add_argument('--classes', type=int, default=5,
                            help='Number of output classes for classification.')

        # Pretrained model settings
        parser.add_argument('--log_name', type=str, default='241111',
                            help='Identifier for logging and checkpointing.')
        parser.add_argument('--pretrained', type=bool, default=False,
                            help='Whether to load a pretrained model (True/False).')
        parser.add_argument('--pretrained_model', type=str,
                            default='./log/241111/models/ckpt_opt.pt',
                            help='Path to the pretrained model weights file.')

        # Dataset settings
        parser.add_argument('--fs', type=int, default=360,
                            help='Sampling frequency of the ECG data.')
        parser.add_argument('--path_train_data', type=str,
                            default='./dataset/train_data.npy',
                            help='Path to save the training data.')
        parser.add_argument('--path_train_labels', type=str,
                            default='./dataset/train_labels.npy',
                            help='Path to save the training labels.')
        parser.add_argument('--path_val_data', type=str,
                            default='./dataset/val_data.npy',
                            help='Path to save the validation data.')
        parser.add_argument('--path_val_labels', type=str,
                            default='./dataset/val_labels.npy',
                            help='Path to save the validation labels.')
        parser.add_argument('--path_test_data', type=str,
                            default='./dataset/test_data.npy',
                            help='Path to save the test data.')
        parser.add_argument('--path_test_labels', type=str,
                            default='./dataset/test_labels.npy',
                            help='Path to save the test labels.')


        return parser


parser = argparse.ArgumentParser(description='Options for ECG classification')
opt = Options().init(parser).parse_known_args()
print(opt)

(Namespace(batch_size=256, nepoch=50, lr_initial=0.0001, decay_epoch=20, device='cuda', classes=5, log_name='241111', pretrained=False, pretrained_model='./log/241111/models/ckpt_opt.pt', fs=360, path_train_data='./dataset/train_data.npy', path_train_labels='./dataset/train_labels.npy', path_val_data='./dataset/val_data.npy', path_val_labels='./dataset/val_labels.npy', path_test_data='./dataset/test_data.npy', path_test_labels='./dataset/test_labels.npy'), ['-f', '/root/.local/share/jupyter/runtime/kernel-29866c30-64e9-4695-bdf8-621fcb820a2c.json'])


### Helper Functions

In [6]:
# For dataset
class ECGDataloader():  # 1110 - 4096 samples
    def __init__(self, data, label):
        self.data = data
        self.label = label

    def __getitem__(self, index):
        return (torch.tensor(self.data[index], dtype=torch.float), torch.tensor(self.label[index], dtype=torch.float))

    def __len__(self):
        return len(self.data)

# For dataset
def label2index(i):
    m = {'N': 0, 'S': 1, 'V': 2, 'F': 3, 'Q': 4}  # uncomment for 5 classes
    return m[i]


# Create a new directory.
def mkdir(path):
    if not os.path.exists(path):
        os.makedirs(path)


# Normalize the ECG data using Z-score normalization.
def normalize_ecg(ecg_data):
    mean = np.mean(ecg_data, axis=0, keepdims=True)
    std = np.std(ecg_data, axis=0, keepdims=True)
    return (ecg_data - mean) / (std + 1e-8)  # Prevent division by zero


# for using pre-training weights
def optimizer_to(optim, device):
    for param in optim.state.values():
        # Not sure there are any global tensors in the state dict
        if isinstance(param, torch.Tensor):
            param.data = param.data.to(device)
            if param._grad is not None:
                param._grad.data = param._grad.data.to(device)
        elif isinstance(param, dict):
            for subparam in param.values():
                if isinstance(subparam, torch.Tensor):
                    subparam.data = subparam.data.to(device)
                    if subparam._grad is not None:
                        subparam._grad.data = subparam._grad.data.to(device)


# Calculate total number of parameters in a model.
def cal_total_params(our_model):
    total_parameters = 0
    for variable in our_model.parameters():
        shape = variable.size()
        variable_parameters = 1
        for dim in shape:
            variable_parameters *= dim
        total_parameters += variable_parameters

    return total_parameters


# Display a progress bar during training/validation.
class Bar(object):
    def __init__(self, dataloader):
        if not hasattr(dataloader, 'dataset'):
            raise ValueError('Attribute `dataset` not exists in dataloder.')
        if not hasattr(dataloader, 'batch_size'):
            raise ValueError('Attribute `batch_size` not exists in dataloder.')

        self.dataloader = dataloader
        self.iterator = iter(dataloader)
        self.dataset = dataloader.dataset
        self.batch_size = dataloader.batch_size
        self._idx = 0
        self._batch_idx = 0
        self._time = []
        self._DISPLAY_LENGTH = 50

    def __len__(self):
        return len(self.dataloader)

    def __iter__(self):
        return self

    def __next__(self):
        if len(self._time) < 2:
            self._time.append(time.time())

        self._batch_idx += self.batch_size
        if self._batch_idx > len(self.dataset):
            self._batch_idx = len(self.dataset)

        try:
            batch = next(self.iterator)
            self._display()
        except StopIteration:
            raise StopIteration()

        self._idx += 1
        if self._idx >= len(self.dataloader):
            self._reset()

        return batch

    def _display(self):
        if len(self._time) > 1:
            t = (self._time[-1] - self._time[-2])
            eta = t * (len(self.dataloader) - self._idx)
        else:
            eta = 0

        rate = self._idx / len(self.dataloader)
        len_bar = int(rate * self._DISPLAY_LENGTH)
        bar = ('=' * len_bar + '>').ljust(self._DISPLAY_LENGTH, '.')
        idx = str(self._batch_idx).rjust(len(str(len(self.dataset))), ' ')

        tmpl = '\r{}/{}: [{}] - ETA {:.1f}s'.format(
            idx,
            len(self.dataset),
            bar,
            eta
        )
        print(tmpl, end='')
        if self._batch_idx == len(self.dataset):
            print()

    def _reset(self):
        self._idx = 0
        self._batch_idx = 0
        self._time = []


# Define a custom writer class that extends SummaryWriter to log training/validation metrics.
class Writer(SummaryWriter):
    def __init__(self, logdir):
        super(Writer, self).__init__(logdir)

    # Method to log training loss.
    def log_train_loss(self, loss_type, train_loss, step):
        self.add_scalar('train_{}_loss'.format(loss_type), train_loss, step)

    # Method to log validation loss.
    def log_valid_loss(self, loss_type, valid_loss, step):
        self.add_scalar('valid_{}_loss'.format(loss_type), valid_loss, step)

    # Method to log other performance metrics (e.g., accuracy, F1-score).
    def log_score(self, metrics_name, metrics, step):
        # Add a scalar value to the writer with the given metric name.
        self.add_scalar(metrics_name, metrics, step)

def save_checkpoint(exp_log_dir, model, epoch):
    save_dict = {
        "model": model.state_dict(),
        'epoch': epoch
    }
    save_path = os.path.join(exp_log_dir, "ckpt_opt.pt")

    torch.save(save_dict, save_path)

### Define DNN Model


In [7]:
class SimpleCNN(nn.Module):
    def __init__(self, opt, in_ch=1, out_ch=64, in_len=360):
        super(SimpleCNN, self).__init__()

        # Convolutional Layer 1 (Reduced model complexity)
        self.conv1 = nn.Conv1d(in_channels=in_ch, out_channels=out_ch, kernel_size=5, stride=2, padding=2)
        self.bn1 = nn.BatchNorm1d(out_ch)

        # Fully Connected Layer
        self.fc1 = nn.Linear(out_ch * (in_len // 2), opt.classes)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))

        # Flatten before fully connected layer
        x = x.view(x.size(0), -1)

        # Fully connected layer
        x = self.fc1(x)

        return x

### Training the Model


In [10]:

class Trainer:
    def __init__(self, opt):
        self.opt = opt
        self.model = SimpleCNN(opt).to(opt.device)
        self.loss_fn = torch.nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=opt.lr_initial)
        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=opt.decay_epoch, gamma=0.1)
        self.writer = Writer(self._get_tboard_dir())
        self.train_loader, self.valid_loader = self._load_data()
        self.log_file_path = os.path.join(self._get_tboard_dir(), 'training_log.txt')

        # Load pretrained model if specified
        if self.opt.pretrained:
            self._load_pretrained_model()

    # Z-score normalization
    def _normalize_ecg(self, ecg_data, axis=1):
        mean = np.mean(ecg_data, axis=axis, keepdims=True)
        std = np.std(ecg_data, axis=axis, keepdims=True)
        return (ecg_data - mean) / (std + 1e-8)  # Prevent division by zero

    def _load_data(self):
        # Load and preprocess data
        train_data = np.load(self.opt.path_train_data)  # Load ECG train data
        train_labels = np.load(self.opt.path_train_labels)  # Load train labels

        val_data = np.load(self.opt.path_val_data)  # Load ECG validation data
        val_labels = np.load(self.opt.path_val_labels)  # Load validation labels

        Y_train = np.array([label2index(i) for i in train_labels])  # Convert labels to indices
        Y_val = np.array([label2index(i) for i in val_labels])  # Convert labels to indices

        # Normalize data along the time axis
        train_data = self._normalize_ecg(train_data)
        val_data = self._normalize_ecg(val_data)

        # Expand dimensions to match model input requirements
        X_train, X_val = np.expand_dims(train_data, 1), np.expand_dims(val_data, 1)

        # Create DataLoader for training and validation
        train_loader = DataLoader(ECGDataloader(X_train, Y_train), batch_size=self.opt.batch_size, shuffle=True,
                                  num_workers=0, pin_memory=True, drop_last=True)
        valid_loader = DataLoader(ECGDataloader(X_val, Y_val), batch_size=self.opt.batch_size, shuffle=False,
                                  num_workers=0)

        return train_loader, valid_loader


    def _get_tboard_dir(self):
        # Initialize directories for logging and model storage
        log_dir = os.path.join(os.getcwd(), 'log', f'{self.opt.log_name}')
        mkdir(log_dir)
        mkdir(os.path.join(log_dir, 'logs'))
        mkdir(os.path.join(log_dir, 'models'))
        return os.path.join(log_dir, 'logs')

    def _load_pretrained_model(self):
        # Load pretrained model weights if specified
        print('Loading the pretrained model...')
        chkpt = torch.load(self.opt.pretrained_model)
        self.model.load_state_dict(chkpt['model'])
        self.optimizer.load_state_dict(chkpt['optimizer'])
        optimizer_to(self.optimizer, self.opt.device)
        print('Resuming Start Epoch:', chkpt['epoch'] + 1)

    def train(self):
        # Print the total number of parameters in the model
        print(
            f'Total parameters: {cal_total_params(self.model):,} ({cal_total_params(self.model) / 1e6:.2f}M)')
        best_f1 = 0
        for epoch in range(1, self.opt.nepoch + 1):
            start_time = time.time()
            self.model.train()
            train_loss = 0

            # Training loop
            for X, Y in Bar(self.train_loader):
                X, Y = X.float().to(self.opt.device), Y.long().to(self.opt.device)  # Move data to device

                # Forward pass and optimization
                outputs = self.model(X)  # Get model predictions
                loss = self.loss_fn(outputs, Y)  # Calculate loss
                self.optimizer.zero_grad()  # Clear previous gradients
                loss.backward()  # Backpropagate to calculate gradients
                self.optimizer.step()  # Update model parameters
                train_loss += loss.item()

            avg_train_loss = train_loss / len(self.train_loader)  # Calculate average training loss
            self.writer.log_train_loss('total', avg_train_loss, epoch)  # Log training loss

            # Validation
            accuracy, f1, avg_val_loss = self._evaluate(self.valid_loader, epoch)  # Evaluate model on validation set
            if f1 > best_f1:  # Save the best model based on accuracy
                best_f1 = f1
                save_checkpoint(self._get_model_dir(), self.model, epoch)

            self.writer.log_score('F1-score', f1, epoch)  # Log validation accuracy
            self.scheduler.step()  # Update learning rate scheduler

            # Logging
            log_message = (
                f'EPOCH[{epoch}] Train Loss: {avg_train_loss:.6f} | Validation Loss: {avg_val_loss:.6f} | Validation F1-score: {f1:.6f} | Time: {time.time() - start_time:.3f}s'
            )
            print(log_message)
            self._log_to_file(log_message)

        print('Training completed.')

    def _evaluate(self, dataloader, epoch):
        # Evaluate the model on the given dataloader
        self.model.eval()
        pred_labels, true_labels = [], []
        total_loss = 0
        with torch.no_grad():
            for X, Y in Bar(dataloader):
                X, Y = X.float().to(self.opt.device), Y.long().to(self.opt.device)  # Move data to device
                pred = self.model(X)  # Get model predictions
                loss = self.loss_fn(pred, Y)  # Calculate loss
                total_loss += loss.item()

                # Get predicted class directly from raw logits
                pred_classes = torch.argmax(pred, dim=1)
                pred_labels.extend(pred_classes.cpu().numpy())
                true_labels.extend(Y.cpu().numpy())

        # Calculate accuracy
        pred_labels = np.array(pred_labels)
        true_labels = np.array(true_labels)
        accuracy = np.mean(pred_labels == true_labels)  # Calculate accuracy
        avg_valid_loss = total_loss / len(dataloader)  # Calculate average validation loss
        f1 = f1_score(true_labels, pred_labels, average='macro')  # Calculate F1-score
        self.writer.log_valid_loss('total', avg_valid_loss, epoch)  # Log validation loss
        return accuracy, f1, avg_valid_loss

    def _get_model_dir(self):
        # Get directory path for saving models
        log_dir = os.path.join(os.getcwd(), 'log', f'{self.opt.log_name}')
        return os.path.join(log_dir, 'models')

    def _log_to_file(self, message):
        # Write log message to file
        with open(self.log_file_path, 'a') as f:
            f.write(message + '\n')

# Parse command-line arguments
opt = Options().init(argparse.ArgumentParser(description='ECG Classification')).parse_known_args()
print(opt[0])

# Set random seeds for reproducibility
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(1234)

# Initialize trainer and start training
trainer = Trainer(opt[0])
trainer.train()

Namespace(batch_size=256, nepoch=50, lr_initial=0.0001, decay_epoch=20, device='cuda', classes=5, log_name='241111', pretrained=False, pretrained_model='./log/241111/models/ckpt_opt.pt', fs=360, path_train_data='./dataset/train_data.npy', path_train_labels='./dataset/train_labels.npy', path_val_data='./dataset/val_data.npy', path_val_labels='./dataset/val_labels.npy', path_test_data='./dataset/test_data.npy', path_test_labels='./dataset/test_labels.npy')
Total parameters: 58,117 (0.06M)
EPOCH[1] Train Loss: 0.242441 | Validation Loss: 0.148391 | Validation F1-score: 0.737243 | Time: 2.272s
EPOCH[2] Train Loss: 0.136498 | Validation Loss: 0.125626 | Validation F1-score: 0.795166 | Time: 2.432s
EPOCH[3] Train Loss: 0.119535 | Validation Loss: 0.115019 | Validation F1-score: 0.815862 | Time: 2.281s
EPOCH[4] Train Loss: 0.109964 | Validation Loss: 0.108146 | Validation F1-score: 0.828753 | Time: 2.686s
EPOCH[5] Train Loss: 0.103390 | Validation Loss: 0.104043 | Validation F1-score: 0.84852

#### Test the model

In [12]:
class Tester:
    def __init__(self, opt):
        self.opt = opt
        self.model = SimpleCNN(opt).to(opt.device)
        self._load_pretrained_model()
        self.test_loader = self._load_data()

    # Z-score normalization
    def _normalize_ecg(self, ecg_data, axis=1):
        mean = np.mean(ecg_data, axis=axis, keepdims=True)
        std = np.std(ecg_data, axis=axis, keepdims=True)
        return (ecg_data - mean) / (std + 1e-8)  # Prevent division by zero

    def _load_data(self):
        # Load and preprocess data
        test_data = np.load(self.opt.path_val_data)  # Load ECG validation data
        test_labels = np.load(self.opt.path_val_labels)  # Load validation labels

        test_data = self._normalize_ecg(test_data)
        Y_test = np.array([label2index(i) for i in test_labels])

        # Prepare DataLoader
        X_test = np.expand_dims(test_data, 1)
        test_loader = DataLoader(ECGDataloader(X_test, Y_test), batch_size=self.opt.batch_size, shuffle=False, num_workers=0)
        return test_loader

    def _load_pretrained_model(self):
        # Load the pretrained model for evaluation
        print('Loading the pretrained model...')
        chkpt = torch.load(self.opt.pretrained_model, map_location=self.opt.device)
        self.model.load_state_dict(chkpt['model'])

    def test(self):
        self.model.eval()
        pred_labels, true_labels = [], []
        total_loss = 0
        loss_fn = torch.nn.CrossEntropyLoss()

        with torch.no_grad():
            for X, Y in Bar(self.test_loader):
                X, Y = X.float().to(self.opt.device), Y.long().to(self.opt.device)
                outputs = self.model(X)
                loss = loss_fn(outputs, Y)
                total_loss += loss.item()

                # Get predicted classes
                pred_classes = torch.argmax(outputs, dim=1)
                pred_labels.extend(pred_classes.cpu().numpy())
                true_labels.extend(Y.cpu().numpy())

        # Calculate accuracy
        pred_labels = np.array(pred_labels)
        true_labels = np.array(true_labels)
        accuracy = np.mean(pred_labels == true_labels)
        avg_test_loss = total_loss / len(self.test_loader)
        f1 = f1_score(true_labels, pred_labels, average='macro')  # Calculate F1-score

        # Print results in a more professional format
        print(f'==================== Test Results ====================')
        # print(f'| Test Accuracy    : {accuracy * 100:.2f}%')
        print(f'| Test F1-score    : {f1 * 100:.2f}%')
        print(f'=======================================================')

# Parse command-line arguments
opt = Options().init(argparse.ArgumentParser(description='ECG Classification')).parse_known_args()
print(opt[0])

torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(1234)

tester = Tester(opt[0])
tester.test()

Namespace(batch_size=256, nepoch=50, lr_initial=0.0001, decay_epoch=20, device='cuda', classes=5, log_name='241111', pretrained=False, pretrained_model='./log/241111/models/ckpt_opt.pt', fs=360, path_train_data='./dataset/train_data.npy', path_train_labels='./dataset/train_labels.npy', path_val_data='./dataset/val_data.npy', path_val_labels='./dataset/val_labels.npy', path_test_data='./dataset/test_data.npy', path_test_labels='./dataset/test_labels.npy')
Loading the pretrained model...


  chkpt = torch.load(self.opt.pretrained_model, map_location=self.opt.device)


| Test F1-score    : 87.67%
