In [1]:
from typing import Optional, Tuple
import os
import wfdb
import numpy as np
from sklearn.preprocessing import LabelEncoder


def get_segments(data: wfdb.Record,
                 annotations: wfdb.Annotation,
                 labels: np.ndarray,
                 left_offset: int = 99,
                 right_offset: int = 160,
                 fixed_length: Optional[int] = None) -> Tuple[np.ndarray, np.ndarray]:
    """ It generates the segments of uninterrupted sequences of arrythmia beats into the corresponding arrythmia groups
    in labels.

    :param data:            The arrythmia signal as a wfdb Record class
    :param annotations:     The set of annotations as a wfdb Annotation class
    :param labels:          The set of valid labels for the different segments. Segments with different labels are discarded
    :param left_offset:     The number of instance at the left of the first R peak of the segment. Default to 99
    :param right_offset:    The number of instances at the right of the last R peak of the segment. Default to 160
    :param fixed_length:    Should the segments have a fixed length? If fixed_length is a number, then the segments will
                            have the specified length. If the segment length is greater than fixed_length, it is truncated
                            or padded with zeros otherwise. Default to None.

    :return:                A tuple that contains the data and the associated labels. Data has a shape of (N, T, V)
                            where N is the number of segments (or instances), V is the number of variables (1 in this case)
                            and T is the number of timesteps of each segment.  Labels are numerically encoded according to the
                            value passed in the :parameter labels param.
    """
    i = 0
    annot_segments = []

    # Get the tuples for consecutive symbols. The tuple is (first, last, symbol) where first is the index of the first occurrence of symbol,
    # and last is the index of the last consecutive ocurrence.
    while(i < len(annotations.symbol)):
        first = i
        current_symbol = annotations.symbol[i]
        while(i < len(annotations.symbol) and annotations.symbol[i] == current_symbol):
            i += 1
        last = i-1
        tup = (first, last, current_symbol)
        annot_segments.append(tup)

    # Now, for each extracted tuple, get the X segments:
    result = []
    classes = []
    for s in annot_segments:  # s is a tuple (first, last, symbol)
        if s[2] in labels:
            classes.append(s[2])
            init = annotations.sample[s[0]] - left_offset
            if init < 0:
                init = 0

            end = annotations.sample[s[1]] + right_offset
            if end >= len(data.p_signal):
                end = len(data.p_signal) - 1

            r = range(init, end)

            # Get the samples of the segments (p_signal is a 2D array, we only want the first axis)
            new_segment = np.array(data.p_signal[r, 1], dtype='float32')

            # truncate or pad with zeros the segment if necessary
            if (fixed_length != None):
                if (len(new_segment) > fixed_length):  # truncate
                    new_segment = new_segment[:fixed_length]
                elif (len(new_segment < fixed_length)):  # pad with zeros to the right
                    number_of_zeros = fixed_length - len(new_segment)
                    new_segment = np.pad(new_segment, (0, number_of_zeros), mode='constant', constant_values=0)

            result.append(new_segment)

    result = np.stack(result, axis=0)
    result = np.reshape(result, (result.shape[0], result.shape[1], 1))  # shape[0] segments with 1 variable, with shape[1] timestamps each
    classes = np.array(classes, dtype=str)

    # Encode labels: from string to numeric.
    label_encoder = LabelEncoder()
    label_encoder.fit(labels)
    classes = label_encoder.transform(classes)

    return (result, classes)



def read_MIT_BIH(path: str,
                 labels: np.ndarray = np.array(['N','L','R','A','V']),
                 left_offset: int = 99,
                 right_offset: int = 160,
                 fixed_length: Optional[int] = 1000) -> Tuple[np.ndarray, np.ndarray]:
    """ It reads the MIT-BIH Arrythmia X with the specified default configuration of the work presented at:
    Oh, Shu Lih, et al. "Automated diagnosis of arrhythmia using combination of CNN and LSTM techniques with
    variable length heart beats." Computers in biology and medicine 102 (2018): 278-287.

    :param labels:              The labels of the different types of arrythmia to be employed
    :param path:                The path of the directory where the X files are stored. Note: The X and annotations
                                files must have the same name, but different extension (annotations must have .atr extension)
    :param left_offset:         The number of instances at the left of the first R peak of the segment. Defaults to 99
    :param right_offset:        The number of instances at the right of the last R peak of the segment. Defaults to 160
    :param fixed_length:        If different to None, the segment will have the specified number of instances. Note that
                                if the segment length > fixed_length it will be truncate or padded with zeros otherwise.

    :return:                     A tuple that contains the data and the associated labels as an ndarray. Data has a shape of (N, T, V)
                                where N is the number of segments (or instances), V is the number of variables (1 in this case)
                                and T is the number of timesteps of each segment.  Labels are numerically encoded according to the
                                value passed in the :parameter labels param.
    """
    print("reading data...")
    segments = []
    classes = []

    files = [ file[:-4] for file in os.listdir(path) if file.endswith('.dat') ]
    for f in files:
        data = wfdb.rdrecord(path + f)
        annotation = wfdb.rdann(path + f, 'atr')

        s, clazz = get_segments(data=data,
                                 annotations=annotation,
                                 labels=labels,
                                 left_offset=left_offset,
                                 right_offset=right_offset,
                                 fixed_length=fixed_length)

        segments.append(s)
        classes.append(clazz)

    segments = np.vstack(segments)
    classes = np.concatenate(classes)
    print("done.")

    return (segments, classes)

# Leemos los datos
dir = "physionet.org/files/mitdb/1.0.0/"

X, y = read_MIT_BIH(dir)

# mostramos la forma de los datos de entrada. En total tenemos 16499 series temporales 
# de 1 variable con 1000 instantes de tiempo cada una de ellas.
# Cada serie temporal tiene únicamente 1 valor asociado o clase que determina el tipo de arritmia
print("X shape: ", X.shape)
print("y shape: ", y.shape)

reading data...
done.
X shape:  (16499, 1000, 1)
y shape:  (16499,)


In [64]:
%load_ext autoreload
%autoreload 2

import sys
import os
import inspect
import pytorch_lightning as pl
import torch
from TSFEDL.data import MIT_BIH
from inspect import signature
from inspect import signature
import torch.nn as nn
import numpy as np
sys.path.append(os.path.abspath(os.path.join(os.path.dirname("__file__"), '..')))
from SADL.time_series.algorithms import tsfedl
from TSFEDL.models_pytorch import OhShuLih

mit_bih = MIT_BIH(path="physionet.org/files/mitdb/1.0.0/", return_hot_coded=False)
mit_bih.x = mit_bih.x[:10]
mit_bih.y = mit_bih.y[:10]
tra_size = int(len(mit_bih) * 0.8)
tst_size = len(mit_bih) - tra_size
train, test = torch.utils.data.random_split(mit_bih, [tra_size, tst_size])
train_loader = torch.utils.data.DataLoader(train, batch_size=1, num_workers=0)
test_loader = torch.utils.data.DataLoader(test, batch_size=1, num_workers=0)
    
#kwargs = {"algorithm_": "ohshulih_classifier", "in_features":1, "n_classes":4, "max_epochs": 1, "label_parser": True}
#model1 = tsfedl.TsfedlAnomalyDetection(**kwargs)

kwargs = {"algorithm_": "ohshulih", "max_epochs": 200, "in_features":1, "label_parser": True}
model1 = tsfedl.TsfedlAnomalyDetection(**kwargs)

print(model1.get_params())
print(model1.model.top_module)
#print(model1.model)
#print(model)
#model1.fit(train_loader)


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
reading data...
done.
aaaain_features
aaaaloss
aaaametrics
aaaaoptimizer
aaaatop_module
{'label_parser': True, 'algorithm_': 'OhShuLih', 'in_features': 1, 'loss': CrossEntropyLoss(), 'metrics': None, 'pytorch_params_': {'max_epochs': 200}}


In [33]:
from torch.nn import functional as F
def acc_from_logits(y_hat, y):
    y_hat = F.softmax(y_hat, dim=1)
    preds = torch.argmax(y_hat, dim=1)
    acc = (preds == y).sum().item() / len(y)
    return acc

#print(train.shape)
kwargs = {"max_epochs": 200,"algorithm_": "ohshulih", "in_features": 1, "loss": nn.CrossEntropyLoss(), "metrics":{"acc": acc_from_logits},}
model1 = tsfedl.TsfedlAnomalyDetection(**kwargs)
#model1.fit(train_loader)
trainer = pl.Trainer(max_epochs=200)
trainer.fit(model1.model, train_loader)
test_results = trainer.test(model1.model, test_loader)
print(test_results[0]['test_acc_epoch'])
#X_pred = model1.model(X_test)
#print(X_pred)
for data, labels in test_loader:
    X_pred = model1.model(data)
    print(X_pred[0])
#model1.decision_function(test_loader)

#model1.decision_function(train_loader)


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name         | Type                | Params
-----------------------------------------------------
0 | loss         | CrossEntropyLoss    | 0     
1 | classifier   | OhShuLih_Classifier | 685   
2 | convolutions | Sequential          | 420   
3 | lstm         | LSTM                | 2.2 K 
-----------------------------------------------------
3.3 K     Trainable params
0         Non-trainable params
3.3 K     Total params
0.013     Total estimated model params size (MB)


Training: |                                                                                                   …

`Trainer.fit` stopped: `max_epochs=200` reached.
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: |                                                                                                    …

────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
       Test metric             DataLoader 0
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
     test_acc_epoch                 1.0
     test_loss_epoch       0.0014386551920324564
────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
1.0
tensor([  3.5918, -13.7475,  -2.3540, -13.3412, -13.1747],
       grad_fn=<SelectBackward0>)
tensor([  3.9990, -15.2864,  -2.5999, -14.8926, -14.6999],
       grad_fn=<SelectBackward0>)


In [56]:
kwargs = {"max_epochs": 200,"algorithm_": "ohshulih", "in_features": 1, "loss": nn.CrossEntropyLoss(), "metrics":{"acc": acc_from_logits},}
#model1.set_params(**kwargs)
#model1.fit(train_loader)
print(model1.get_params())
test_results = None

# Set model to evaluation mode
model1.model.eval()

test_loss = 0.0
test_metrics = {name: 0.0 for name in model1.model.metrics.keys()}

# Disable gradient computation
with torch.no_grad():
    for batch in test_loader:
        # Unpack batch
        x, y = batch

        # Forward pass
        y_hat = model1.model(x)

        # Compute loss
        loss = model1.model.loss(y_hat, y)
        test_loss += loss.item()

        # Compute metrics
        for name, f in model1.model.metrics.items():
            value = f(y_hat, y)
            test_metrics[name] += value

    # Compute average loss and metrics
    num_batches = len(test_loader)
    test_loss /= num_batches
    test_metrics = {name: value / num_batches for name, value in test_metrics.items()}
    print(test_metrics['acc'])


{'label_parser': None, 'algorithm_': 'OhShuLih', 'in_features': 1, 'loss': CrossEntropyLoss(), 'metrics': {'acc': <function acc_from_logits at 0x7fce22e9d300>}, 'pytorch_params_': {'max_epochs': 200}}
1.0
