This notebook is responsible for the training of the prediction of the SCP
codes model.

In [1]:
import pandas as pd
import numpy as np
import wfdb
from ast import literal_eval

This dataset already includes a "strat_fold" column, already created by the
authors so the training data is already chosen by the authors and not randomly
and produce unwanted biases

In [2]:
# Dataset directory
ptb_dir = "ptb-xl-a-large-publicly-available-electrocardiography-dataset-1.0.1/"

# Whether to use the high or low resolution dataset
lowres = True
filetype = "filename_" + ("lr" if lowres else "hr")

# Loads the dataset with only the scp codes, strat fold and the filename
df = pd.read_csv(ptb_dir+"ptbxl_database.csv", index_col=["ecg_id"], usecols=["ecg_id", "scp_codes", "strat_fold", filetype])
df.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 21837 entries, 1 to 21837
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   scp_codes    21837 non-null  object
 1   strat_fold   21837 non-null  int64 
 2   filename_lr  21837 non-null  object
dtypes: int64(1), object(2)
memory usage: 682.4+ KB


In [3]:
df.head()

Unnamed: 0_level_0,scp_codes,strat_fold,filename_lr
ecg_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,"{'NORM': 100.0, 'LVOLT': 0.0, 'SR': 0.0}",3,records100/00000/00001_lr
2,"{'NORM': 80.0, 'SBRAD': 0.0}",2,records100/00000/00002_lr
3,"{'NORM': 100.0, 'SR': 0.0}",5,records100/00000/00003_lr
4,"{'NORM': 100.0, 'SR': 0.0}",3,records100/00000/00004_lr
5,"{'NORM': 100.0, 'SR': 0.0}",4,records100/00000/00005_lr


In [4]:
def load_waveforms():
  """Loads the low or high resolution waveform data according to the `lowres`
  variable. The function iterates through the dataset and reads every file with
  rdsamp. Rdsamp returns a tuple (signal, meta), therefore to get the signal,
  [0] is needed after the function

  Returns:
      array: array of array with a dimension of `12x10*frequency` (100Hz / 500Hz
      depending on `lowres` variable)
  """
  return np.array([ wfdb.rdsamp(ptb_dir+f)[0] for f in df[filetype] ])

# Loads all the waveform data
X = load_waveforms()

# Here we process the scp_codes into multilabel array

In [5]:
# Reads the scp csv file with the category as index
scp_df = pd.read_csv(ptb_dir+'scp_statements.csv', index_col=0)
scp_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 71 entries, NDT to TRIGU
Data columns (total 12 columns):
 #   Column                         Non-Null Count  Dtype  
---  ------                         --------------  -----  
 0   description                    71 non-null     object 
 1   diagnostic                     44 non-null     float64
 2   form                           19 non-null     float64
 3   rhythm                         12 non-null     float64
 4   diagnostic_class               44 non-null     object 
 5   diagnostic_subclass            44 non-null     object 
 6   Statement Category             71 non-null     object 
 7   SCP-ECG Statement Description  71 non-null     object 
 8   AHA code                       37 non-null     float64
 9   aECG REFID                     23 non-null     object 
 10  CDISC Code                     13 non-null     object 
 11  DICOM Code                     13 non-null     object 
dtypes: float64(4), object(8)
memory usage: 7.2+ KB


In [6]:
# Convert json string as python object
scp_codes = df["scp_codes"].apply(lambda x : literal_eval(x))

# How many scp categories are there
scp_length = len(scp_df)

def create_multilabel(scpc: dict):
  # Create empty list with the length of scp
  res = [ 0 for _ in range(scp_length) ]
  
  for x, y in scpc.items():
    # If the confidence is below 50%, ditch it.
    if (y > 50):
      res[scp_df.index.get_loc(x)] = 1
    
  return res
    
# For every scp_codes in df, we will create a new mutlilabel 2d array
# New column
df["result"] = scp_codes.apply(create_multilabel)
df.head()

Unnamed: 0_level_0,scp_codes,strat_fold,filename_lr,result
ecg_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,"{'NORM': 100.0, 'LVOLT': 0.0, 'SR': 0.0}",3,records100/00000/00001_lr,"[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
2,"{'NORM': 80.0, 'SBRAD': 0.0}",2,records100/00000/00002_lr,"[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
3,"{'NORM': 100.0, 'SR': 0.0}",5,records100/00000/00003_lr,"[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
4,"{'NORM': 100.0, 'SR': 0.0}",3,records100/00000/00004_lr,"[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."
5,"{'NORM': 100.0, 'SR': 0.0}",4,records100/00000/00005_lr,"[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ..."


In [7]:
# Gets the X of the dataset
Y = np.array(df["result"].to_list()).astype("float32")
print(len(X), len(Y))

21837 21837


In [8]:
# Splits the dataset into training and testing based on the strat_fold
# the dataset has graciously provided us

# Select which fold we want to use
test_fold = 10
train_sel = df["strat_fold"] != test_fold
test_sel = df["strat_fold"] == test_fold

# Splits the data
X_tr = X[np.where(train_sel)]
Y_tr = Y[train_sel]
X_te = X[np.where(test_sel)]
Y_te = Y[test_sel]

print(len(X_tr), len(Y_tr), len(X_te), len(Y_te))

19634 19634 2203 2203


## From this point, we can do the model training

In [9]:
# Install efficientnet lite keras
# %pip install -q git+https://github.com/sebastian-sz/efficientnet-lite-keras@main

In [12]:
# Load the model with tensorflow
import tensorflow as tf

# Models
from models import efficient_net
from Inception.inception import Classifier_INCEPTION as Inception
import MiniRocket.minirocket_multivariate as MiniRocket
from sklearn.linear_model import RidgeClassifierCV
from sktime.classification.dictionary_based import BOSSEnsemble

In [12]:
def fit_efficient():
  """Efficient Net model fitting
  """
  model = efficient_net(scp_length)
  
  # Reshape the data so it can be inputted
  X_tr_r = X_tr.reshape((len(X_tr), 100, 120))
  X_te_r = X_te.reshape((len(X_te), 100, 120))
  
  model.fit(
    X_tr_r,
    Y_tr,
    epochs=10,
    batch_size=32,
  )
  
  return model, X_te_r

def fit_inception():
  in_model = Inception(
    "/inceptionres",
    X.shape[1:],
    scp_length,
    batch_size=1000,
    nb_epochs=10
  )
  model = in_model.model
  model.fit(
    X_tr,
    Y_tr,
    batch_size=1000,
    epochs=10,
    validation_split=0.1,
    callbacks=[in_model.callbacks[0]], # Get only reduce learning rate
  )
  
  return model

def fit_mini_rocket(classifier=RidgeClassifierCV(alphas = np.logspace(-3, 3, 10), normalize = True)):
  params = MiniRocket.fit(X_tr, scp_length)
  X_tr_trans = MiniRocket.transform(X_tr, params)
  
  classifier.fit(X_tr_trans, Y_tr)
  
  return classifier, params

def fit_boss_ens():
  classifier = BOSSEnsemble(max_ensemble_size=5)
  classifier.fit(X_tr, Y_tr)
  return classifier

In [13]:
model = fit_mini_rocket()

Epoch 1/10


In [31]:
# acc = tf.keras.metrics.Accuracy()
# acc.update_state([Y_te], [result])
# acc.result().numpy()

1.0