This script will generate a simple CNN-LSTM-Model that can estimate the pulse rate from 4-second windows of pulsatile data.

First: Download the Fantasia-Databse, install ONNX, and install WFDB Toolbox (will suggest the restart of the virtual machine, does not seem to be necessary)

In [1]:
!wget https://physionet.org/static/published-projects/fantasia/fantasia-database-1.0.0.zip
!unzip fantasia-database-1.0.0.zip -d /content/
!pip install onnx
!pip install wfdb

--2022-02-07 13:51:24--  https://physionet.org/static/published-projects/fantasia/fantasia-database-1.0.0.zip
Resolving physionet.org (physionet.org)... 18.18.42.54
Connecting to physionet.org (physionet.org)|18.18.42.54|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 306763953 (293M) [application/zip]
Saving to: ‘fantasia-database-1.0.0.zip’


2022-02-07 13:51:33 (33.1 MB/s) - ‘fantasia-database-1.0.0.zip’ saved [306763953/306763953]

Archive:  fantasia-database-1.0.0.zip
 extracting: /content/fantasia-database-1.0.0/f2o09.dat  
 extracting: /content/fantasia-database-1.0.0/f1y03.ecg  
 extracting: /content/fantasia-database-1.0.0/f2o01.ecg  
 extracting: /content/fantasia-database-1.0.0/f1y05.ecg  
 extracting: /content/fantasia-database-1.0.0/f1o04.dat  
 extracting: /content/fantasia-database-1.0.0/f2y10.ecg  
 extracting: /content/fantasia-database-1.0.0/f1y08.ecg  
 extracting: /content/fantasia-database-1.0.0/f1o06.ecg  
 extracting: /content/fantasia-d

Next: process the Fantasia-Database pulsatile data.

In [2]:
from scipy import signal
from scipy import stats
import torch
import numpy as np
import wfdb

N_SAMPLES = 1000000 # Maximum number of datapoints
D_in = 100 # Input-Dimensions
D_out = 1 # Output-Dimension

fs_fantasia = 250
F_DOWNSAMPLE = 10

i_train = [0,1,2,4,5,6,8,9]
i_test = [3,7]

x_train = torch.zeros(N_SAMPLES, D_in)
y_train = torch.zeros(N_SAMPLES, D_out)
indx = 0
for SQUEEZE in [0.7, 1.0, 1.3]: # Data-augmentation, artificial stretching and squeezing of the signal -> faster, slower HR
  for i in i_train:
    signals, fields = wfdb.rdsamp('fantasia-database-1.0.0/f2o' + str(i+1).zfill(2))
    x_resampled = signal.resample(signals[:,2], int(signals.shape[0]/(F_DOWNSAMPLE*SQUEEZE)))
    annotation = wfdb.rdann('fantasia-database-1.0.0/f2o' + str(i+1).zfill(2),'ecg')
    annotation = annotation.sample / (F_DOWNSAMPLE*SQUEEZE)

    offset = 0
    while offset + D_in + 1 < x_resampled.shape[0]:
      x_train[indx,:] = torch.from_numpy(stats.zscore(x_resampled[offset:offset+D_in])).to(x_train)    
      f_hr = 1/(np.median(np.diff(annotation[(annotation>offset)&(annotation<=(offset+D_in))]))/(fs_fantasia/F_DOWNSAMPLE))
      if f_hr < 3 and f_hr > .5:
        y_train[indx,:] = f_hr * 60         
        indx = indx + 1
      offset = offset + D_in

    signals, fields = wfdb.rdsamp('fantasia-database-1.0.0/f2y' + str(i+1).zfill(2))
    x_resampled = signal.resample(signals[:,2], int(signals.shape[0]/(F_DOWNSAMPLE*SQUEEZE)))
    annotation = wfdb.rdann('fantasia-database-1.0.0/f2y' + str(i+1).zfill(2),'ecg')
    annotation = annotation.sample / (F_DOWNSAMPLE*SQUEEZE)

    offset = 0
    while offset + D_in + 1 < x_resampled.shape[0]:
      x_train[indx,:] = torch.from_numpy(stats.zscore(x_resampled[offset:offset+D_in])).to(x_train)    
      f_hr = 1/(np.median(np.diff(annotation[(annotation>offset)&(annotation<=(offset+D_in))]))/(fs_fantasia/F_DOWNSAMPLE))
      if f_hr < 3 and f_hr > .5:
        y_train[indx,:] = f_hr * 60          
        indx = indx + 1
      offset = offset + D_in
x_train = x_train[:indx,:]
y_train = y_train[:indx,:]

x_test = torch.zeros(N_SAMPLES, D_in)
y_test = torch.zeros(N_SAMPLES, D_out)
indx = 0
for i in i_test:
  signals, fields = wfdb.rdsamp('fantasia-database-1.0.0/f2o' + str(i+1).zfill(2))
  x_resampled = signal.resample(signals[:,2], int(signals.shape[0]/(F_DOWNSAMPLE*SQUEEZE)))
  annotation = wfdb.rdann('fantasia-database-1.0.0/f2o' + str(i+1).zfill(2),'ecg')
  annotation = annotation.sample / (F_DOWNSAMPLE*SQUEEZE)

  offset = 0
  while offset + D_in + 1 < x_resampled.shape[0]:
    x_test[indx,:] = torch.from_numpy(stats.zscore(x_resampled[offset:offset+D_in])).to(x_test)    
    f_hr = 1/(np.median(np.diff(annotation[(annotation>offset)&(annotation<=(offset+D_in))]))/(fs_fantasia/F_DOWNSAMPLE))
    if f_hr < 3 and f_hr > .5:
      y_test[indx,:] = f_hr * 60         
      indx = indx + 1
    offset = offset + D_in

  signals, fields = wfdb.rdsamp('fantasia-database-1.0.0/f2y' + str(i+1).zfill(2))
  x_resampled = signal.resample(signals[:,2], int(signals.shape[0]/(F_DOWNSAMPLE*SQUEEZE)))
  annotation = wfdb.rdann('fantasia-database-1.0.0/f2y' + str(i+1).zfill(2),'ecg')
  annotation = annotation.sample / (F_DOWNSAMPLE*SQUEEZE)

  offset = 0
  while offset + D_in + 1 < x_resampled.shape[0]:
    x_test[indx,:] = torch.from_numpy(stats.zscore(x_resampled[offset:offset+D_in])).to(x_test)    
    f_hr = 1/(np.median(np.diff(annotation[(annotation>offset)&(annotation<=(offset+D_in))]))/(fs_fantasia/F_DOWNSAMPLE))
    if f_hr < 3 and f_hr > .5:
      y_test[indx,:] = f_hr * 60          
      indx = indx + 1
    offset = offset + D_in
x_test = x_test[:indx,:]
y_test = y_test[:indx,:]



  out=out, **kwargs)
  ret = ret.dtype.type(ret / rcount)


## Definition of the Models 

Choose one of three:

- CNN-Model (performs okayish)
- CNN-LSTM-Model (performs quite well and trains efficiently)
- Transformer-Model (converges really slow, performance must be improved)

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    NAME = 'KISMED-Easy-Pulse'
    def __init__(self):
        super(Net, self).__init__()
        # 1 input image channel, 6 output channels, 5x5 square convolution
        # kernel
        self.conv1 = nn.Conv2d(in_channels=1,
                              out_channels=6,
                              kernel_size=(1,5),
                              padding=(0,2),
                              stride=(1,1),
                              bias=False)
        self.conv2 = nn.Conv2d(in_channels=6,
                              out_channels=16,
                              kernel_size=(1,5),
                              padding=(0,2),
                              stride=(1,1),
                              bias=False)
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(400, 120)  # 5*5 from image dimension
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 1)

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = x.unsqueeze(1)
        x = F.max_pool2d(F.relu(self.conv1(x)), (1,2))
        # If the size is a square, you can specify with a single number
        x = F.max_pool2d(F.relu(self.conv2(x)), (1,2))
        x = torch.flatten(x, 1) # flatten all dimensions except the batch dimension
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x



In [4]:
class ePulse(nn.Module):
    NAME = 'KISMED-Easy-Pulse'
    def __init__(self,range):
        super(ePulse, self).__init__()
        # kernel
        self.range = range[1]-range[0]
        self.offset = range[0]

        self.conv1 = nn.Conv1d(in_channels=1,
                              out_channels=6,
                              kernel_size=5,
                              padding=2,
                              stride=1,
                              bias=True)
        self.conv2 = nn.Conv1d(in_channels=6,
                              out_channels=6,
                              kernel_size=5,
                              padding=2,
                              stride=1,
                              bias=False)
        self.conv3 = nn.Conv1d(in_channels=6,
                              out_channels=16,
                              kernel_size=5,
                              padding=2,
                              stride=2,
                              bias=False)
        self.conv4 = nn.Conv1d(in_channels=16,
                              out_channels=16,
                              kernel_size=5,
                              padding=2,
                              stride=2,
                              bias=False)
        self.drop_out = nn.Dropout(p=0.2)
        self.bn = nn.BatchNorm1d(16)
        # an affine operation: y = Wx + b

        self.lstm = nn.LSTM(input_size=16,bidirectional=True,hidden_size=16,batch_first=True)
        self.fc1 = nn.Linear(2*25*16, 120)  # 5*5 from image dimension
        self.fc2 = nn.Linear(120, 60)
        self.fc3 = nn.Linear(60, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
       
        x = F.relu(self.conv2(x))
        x = self.drop_out(x)
        x = F.relu(self.conv3(x))
        x = F.relu(self.bn(self.conv4(x))) 
        #lstm in [batch,seq,features]
        out,_ = self.lstm(x.permute(0,2,1))
        x = F.relu(out)
        x = torch.flatten(x, 1) 
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x = torch.sigmoid(x)*self.range+self.offset
        return x

In [5]:
from torch import Tensor
import math
from typing import Tuple

# 
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [seq_len, batch_size, embedding_dim]
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)


class TransformerModel(nn.Module):
  # converges extremly slow
  NAME='KISMED-hard-pulse'
  def __init__(self,seq_length,n_features,d_model=4,nhead=1,dim_feedforward=256):
    super(TransformerModel, self).__init__()
    assert d_model%nhead==0
    self.cnn = nn.Conv1d(in_channels=n_features,
                              out_channels=d_model,
                              kernel_size=5,
                              padding=2,
                              stride=2,
                              bias=False)
    self.pe = PositionalEncoding(d_model,dropout=0.1,max_len=1024)
    encoder_layer = nn.TransformerEncoderLayer(d_model,nhead,dim_feedforward)
    self.encoder = nn.TransformerEncoder(encoder_layer,num_layers=4)
    self.linear = nn.Linear(int(seq_length/2)*d_model,1)

  def forward(self,x):
    x= F.relu(self.cnn(x))
    x = self.pe(x.permute(2,0,1))
    x = self.encoder(x)

    y = self.linear(x.permute(1,0,2).reshape(x.shape[1],-1))
    return y

## Training Utilities
Training with Model-checkpoints and Evaluation

In [6]:
import shutil
def save_ckp(state, is_best, checkpoint_path, best_model_path):
    """
    state: checkpoint we want to save
    is_best: is this the best model so far (minimum val. loss)
    checkpoint_path: path to save newest checkpoint
    best_model_path: path to save best model checkpoint
    """
    f_path = checkpoint_path
    # save checkpoint data to the path given, checkpoint_path
    torch.save(state, f_path)
    # if it is a best model, min validation loss
    if is_best:
        best_fpath = best_model_path
        # copy that checkpoint file to best path given, best_model_path
        shutil.copyfile(f_path, best_fpath)

def load_ckp(checkpoint_fpath, model, optimizer):
    """
    checkpoint_path: path to save checkpoint
    model: model that we want to load checkpoint parameters into       
    optimizer: optimizer we defined in previous training
    """
    # load check point
    checkpoint = torch.load(checkpoint_fpath)
    # initialize state_dict from checkpoint to model
    model.load_state_dict(checkpoint['state_dict'])
    # initialize optimizer from checkpoint to optimizer
    optimizer.load_state_dict(checkpoint['optimizer'])
    # initialize valid_loss_min from checkpoint to valid_loss_min
    valid_loss_min = checkpoint['valid_loss_min']
    # return model, optimizer, epoch value, min validation loss 
    return model, optimizer, checkpoint['epoch'], valid_loss_min

def train(dataloader, model, optimizer,scheduler, loss_fn, checkpoint_path, best_model_path,x_val,y_val,start_epochs=1, n_epochs=15,valid_loss_min_input=1e10):
    '''
    dataloader: dataloader for data ([batch,seq_len],[seq_len])
    model: model input [batch,1,seq_len], output [seq_len]
    optimizer: (Adam)
    scheduler: (required)
    loss_fn: a loss function f(y_pred,y_ref)
    checkpoint_path: str
    best_model_path: str
    x_val: torch Tensor for validation after each epoch
    y_val: torch Tensor ...
    ...

    ----------
    returns model

    '''
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print('Using device:', device)
    model.to(device)

    # initialize tracker for minimum validation loss
    valid_loss_min = valid_loss_min_input 
    dataset_size = len(dataloader.dataset)
    
    for epoch in range(start_epochs, n_epochs+1):
        # initialize variables to monitor training and validation loss
        train_loss = 0.0
        valid_loss = 0.0
        
        
        #train the model
        model.train()
        
        print(f"Epoch {epoch}\n-------------------------------")

        # Loop over batches in an epoch using DataLoader
        epoch_loss = 0
        for id_batch, (x_batch, y_batch) in enumerate(dataloader):

            # print(x_batch.unsqueeze(1).unsqueeze(1).shape)
            x_batch = x_batch.to(device)
            y_batch = y_batch.to(device)
            y_batch_pred = model(x_batch.unsqueeze(1))

            loss = loss_fn(y_batch_pred, y_batch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss+=loss.item()
            # Every 100 batches, print the loss for this batch
            # as well as the number of examples processed so far 
            if id_batch % 100 == 0:
                loss, current = loss.item(), (id_batch + 1)* len(x_batch)
                print(f"loss: {loss:>7f}  [{current:>5d}/{dataset_size:>5d}]")
        epoch_loss/=id_batch
        epoch_loss = math.sqrt(epoch_loss/id_batch)
        
        #Evaluate and save Model
        model.eval()
        with torch.no_grad():
            y_pred = model(x_val.unsqueeze(1).to(device))
        print(f"RMSE Train Error {epoch_loss} BPM")
        valid_loss = np.sqrt(np.mean(np.square((y_val.numpy()-y_pred.detach().cpu().numpy())))).item()
        print(f"RMSE Test Error {valid_loss} BPM")
        scheduler.step()
        
        # create checkpoint variable and add important data
        checkpoint = {
            'epoch': epoch + 1,
            'valid_loss_min': valid_loss,
            'state_dict': model.state_dict(),
            'optimizer': optimizer.state_dict(),
        }
        
        # save checkpoint
        save_ckp(checkpoint, False, checkpoint_path, best_model_path)
        
        # save best model when validation loss has decreased
        if valid_loss <= valid_loss_min:
            print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(valid_loss_min,valid_loss))
            # save checkpoint as best model
            save_ckp(checkpoint, True, checkpoint_path, best_model_path)
            valid_loss_min = valid_loss
            
    # return trained model
    return model.to('cpu')

def eval(model,x_test,y_test):
  model.cpu().eval()

  with torch.no_grad():
      y_pred = model(x_test.unsqueeze(1))
  print(f"RMSE Test Error {np.sqrt(np.mean(np.square((y_test.numpy()-y_pred.detach().numpy()))))} BPM")

  #RMSE Test Error 5.608696 BPM

  return y_pred

##Training of the Model

In [7]:
!mkdir model

In [8]:
import torch
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.data import DataLoader, TensorDataset

# Define the batch size and the number of epochs
BATCH_SIZE = 128
N_EPOCHS = 30

dataset = TensorDataset(x_train, y_train)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

#model = Net()
model = ePulse([y_train.min()-2,y_train.max()+2])
#model =TransformerModel(100,1)

loss_fn = torch.nn.MSELoss(reduction='sum')
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = MultiStepLR(optimizer, milestones=[10,15,21,25], gamma=0.5)


model = train(dataloader, model, optimizer,scheduler, loss_fn, x_val=x_test,y_val=y_test,
              checkpoint_path='./model/model_check.pt', best_model_path='./model/model_best.pt',start_epochs=0, n_epochs=N_EPOCHS,valid_loss_min_input=1e10)

Using device: cuda
Epoch 0
-------------------------------
loss: 296319.218750  [  128/89139]
loss: 15344.931641  [12928/89139]
loss: 1309.543457  [25728/89139]
loss: 2046.970459  [38528/89139]
loss: 2130.613770  [51328/89139]
loss: 3364.208496  [64128/89139]
loss: 2271.935303  [76928/89139]
RMSE Train Error 3.0983290891894453 BPM
RMSE Test Error 7.903286933898926 BPM
Validation loss decreased (10000000000.000000 --> 7.903287).  Saving model ...
Epoch 1
-------------------------------
loss: 721.903809  [  128/89139]
loss: 1671.505493  [12928/89139]
loss: 1465.114502  [25728/89139]
loss: 2208.099854  [38528/89139]
loss: 520.739807  [51328/89139]
loss: 401.093994  [64128/89139]
loss: 481.621521  [76928/89139]
RMSE Train Error 1.5407634554769984 BPM
RMSE Test Error 7.309746265411377 BPM
Validation loss decreased (7.903287 --> 7.309746).  Saving model ...
Epoch 2
-------------------------------
loss: 2479.265625  [  128/89139]
loss: 1124.248047  [12928/89139]
loss: 1153.694092  [25728/8913

In [9]:
model = ePulse([y_train.min()-2,y_train.max()+2])
model, optimizer, ep, loss = load_ckp('./model/model_best.pt', model, optimizer)

y_pred = eval(model,x_test,y_test)

#RMSE Test Error 6.07645320892334 BPM

RMSE Test Error 5.8528876304626465 BPM


## Analysis of Model-Error

In [10]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.layouts import row
from bokeh.plotting import figure
output_notebook()

In [11]:
error = (y_test-y_pred).numpy()
res = np.argsort(np.abs(error),axis=None)

print('Abs.Error min:',error.min(),'std:',error.std(),'max:',error.max())
print('Relative Error > 5%:',"{:.2f}% of examples".format(100*(np.abs(error)>0.05*y_test.numpy()).mean()))
print('90% quantil of Abs. Error:',"{:.4f} BPM".format(np.abs(error[res[int(res.shape[0]*0.9)]]).item()))

Abs.Error min: -46.01363 std: 5.7871537 max: 111.99532
Relative Error > 5%: 6.60% of examples
90% quantil of Abs. Error: 3.0340 BPM


Index of median good estimation

In [12]:
#indx = res[len(res)//4]
indx = res[5000]

Implemented naive peak finding for HR-estimation

In [13]:
from scipy.signal import find_peaks

def compute_hr_naive(x,fs):
  y = np.ones(x.shape[0])*70
  for i in range(x.shape[0]):
    indexes, _ = find_peaks(x[i,:],threshold=0.01,distance=3,height=1)
    if indexes.shape[0]>=2:
      time_points =(np.arange(x[i,:].shape[0])/fs)[indexes][[0,-1]]
      y[i] = (len(indexes)-1)/(time_points[1]-time_points[0])*60
  return y

In [14]:
# show estimation for indx
from scipy.signal import find_peaks
indexes, _ = find_peaks(x_test[indx,:].numpy(),threshold=0.01,distance=3,height=1)
print('Peaks are: %s' % (indexes))
time_points =(np.arange(x_test[indx,:].shape[0])/fs_fantasia*F_DOWNSAMPLE)[indexes][[0,-1]]
print('start and end',time_points)
hr_peak_f = (len(indexes)-1)/(time_points[1]-time_points[0])*60
print('Estimated heart rate to',hr_peak_f,'BPM')


Peaks are: [ 3 20 36 53 69 85]
start and end [0.12 3.4 ]
Estimated heart rate to 91.46341463414633 BPM


In [15]:

p1 = figure(plot_width=250, plot_height =250)
p1.line(x=np.arange(x_test[indx,:].shape[0])/fs_fantasia*F_DOWNSAMPLE,y=x_test[indx,:].numpy())
p1.square(x=(np.arange(x_test[indx,:].shape[0])/fs_fantasia*F_DOWNSAMPLE)[indexes],y=x_test[indx,indexes].numpy(),color='red',size=2)
show(p1,notebook_handle=True)
print('True heart rate',y_test[indx].item())
print('Estimated heart rate',y_pred[indx].item())

True heart rate 93.30143737792969
Estimated heart rate 89.91758728027344


In [16]:
y_peak_f=compute_hr_naive(x_test.numpy(),fs_fantasia/F_DOWNSAMPLE)
error_peak = np.sqrt(np.mean((y_test-y_peak_f).numpy()**2))
print("RMSE Naive peak-finder: ",error_peak,"BPM")


RMSE Naive peak-finder:  21.629401498959396 BPM


Statistics of Absolute HR Error 

In [17]:
from bokeh.layouts import gridplot
from bokeh.plotting import figure, show


hist, edges = np.histogram(error, density=True, bins=100)


p = figure(title='Error of estimation', background_fill_color="#fafafa")
p.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
           fill_color="navy", line_color="white", alpha=0.5)
p.xaxis.axis_label = 'e'
p.yaxis.axis_label = 'samples'
show(p,notebook_handle=True)

Analyze Distribution of samples with low prediction error vs high prediction error.

The Distributions are similar, hence the model learns over the whole HR-Range consitently.

In [18]:
good_hrs = y_test[res[:len(res)//2]]
bad_hrs = y_test[res[len(res)//2:]]
range_hist = [y_test.min().item(),y_test.max().item()]

hist_good, edges_good = np.histogram(good_hrs, density=True, bins=100,range=range_hist)
hist_bad, edges_bad = np.histogram(bad_hrs, density=True, bins=100,range=range_hist)


p = figure(title='hrs', background_fill_color="#fafafa")
p.quad(top=hist_good, bottom=0, left=edges_good[:-1], right=edges_good[1:],
           fill_color="navy", line_color="white", alpha=0.5, legend_label='good estimate')
p.quad(top=hist_bad, bottom=0, left=edges_bad[:-1], right=edges_bad[1:],
          fill_color="red", line_color="white", alpha=0.5,legend_label='bad estimate')
p.xaxis.axis_label = 'hr / min'
p.yaxis.axis_label = 'samples'
show(p,notebook_handle=True)

### Bland Altman Plot


In [19]:
# credits Simon Kamronn @ https://gist.github.com/simonkamronn/e846d88b8660f1aba7edbeca9afa1bd9
#@title *bland altman code*

from scipy.stats import norm, shapiro, kstest, anderson
import bokeh.plotting as bplt
from bokeh import layouts
from bokeh.models import Span
import pandas as pd
import numpy as np


def vertical_histogram(y):
    vhist, vedges = np.histogram(y, bins=50)
    vzeros = np.zeros(len(vedges)-1)
    vmax = max(vhist)*1.1
    
    pv = bplt.figure(toolbar_location=None, plot_width=200, plot_height=400, x_range=(0, vmax),
                     min_border=10, y_axis_location="right")
    pv.ygrid.grid_line_color = None
    pv.xaxis.major_label_orientation = np.pi/4
    pv.background_fill_color = "#fafafa"
    
    # Plot histogram
    pv.quad(left=0, bottom=vedges[:-1], top=vedges[1:], right=vhist, color="white", line_color="#3A5785")
    
    # Normal fit
    mu, sigma = norm.fit(y)
    xp = np.linspace(y.min(), y.max(), 100)
    pdf = norm.pdf(xp, mu, sigma)
    pdf = (vhist.max()-1)*pdf/pdf.max()    
    
    # Plot pdf of fit
    pv.line(pdf, xp, line_color="#D95B43", line_width=8, alpha=0.7)
    
    return pv


def bland_altman(df, s1, s2, color=None, marker=None, log_transform=True):
    df = df.copy().dropna()
    if log_transform:
        df[s1] = np.log2(df[s1])
        df[s2] = np.log2(df[s2])
    
    # Calc average and difference
    df = df.assign(x=(df[s1] + df[s2])/2, y=df[s1] - df[s2])
    
    # Test for normality
    #print('Shapiro Wilk\n  stats: {}, p: {}'.format(*shapiro(df['y'].values)))
    print('Kolmogorov-Smirnov\n  stats: {}, p: {}'.format(*kstest(df['y'].values, 'norm')))
    print('Anderson-Darling\n  stats: {}, critical_values: {}'.format(*anderson(df['y'].values, 'norm')))

    # Make plots
    p = bplt.figure(title='Bland-Altman',toolbar_location="above", plot_width=700, plot_height=400,)
    p.scatter(x=df['x'], y=df['y'], color=color, marker=marker)
    
    mean_y = Span(location=df['y'].mean(), 
                  dimension='width', line_color='green', 
                  line_dash='dashed', line_width=3)
    
    std_y_upper = Span(location=df['y'].mean() + df['y'].std() * 1.96, 
                       dimension='width', line_color='red', 
                       line_dash='dashed', line_width=3)
    
    std_y_lower = Span(location=df['y'].mean() - df['y'].std() * 1.96, 
                       dimension='width', line_color='red', 
                       line_dash='dashed', line_width=3)

    p.add_layout(mean_y)
    p.add_layout(std_y_upper)
    p.add_layout(std_y_lower)

    p.xaxis.axis_label = 'Average'
    p.yaxis.axis_label = 'Difference ({} - {})'.format(s1, s2)
    
    # Create histogram and norm fit
    pv = vertical_histogram(df['y'])
    p = layouts.Row(p, pv)
    return p

Bland-Altman Plot shows that Error is normally distributed and Model works fine over frequency spektrum

In [20]:
df = pd.DataFrame(data=np.concatenate((y_test.numpy().reshape(-1,1),y_pred.numpy().reshape(-1,1)),axis=1),columns=['hr_pred','hr_ref'])
p = bland_altman(df,'hr_pred','hr_ref',color='blue',marker='circle',log_transform=False)
show(p,notebook_handle=True)

Kolmogorov-Smirnov
  stats: 0.16624746132887147, p: 2.773021124623578e-131
Anderson-Darling
  stats: 961.5344436934756, critical_values: [0.576 0.656 0.786 0.917 1.091]


## Define Meta-Data
Definition of Meta-Data (The original line from Peter's Code does not work for me, I could also not find any definition for OUTPUT_NAMES in his code)

In [21]:
def build_model_metadata(model):
    config_metadata = dict(
        author='C. Hoog Antink, M. Rohr, KIS*MED',
        model=model.NAME,
        filter=True,
        zscore=True,
        crop=True,
        crop_size=D_in,
        fs=int(fs_fantasia / F_DOWNSAMPLE)#,
        #output_names=','.join(model.OUTPUT_NAMES) #This does not seem to be working...
    )
    return config_metadata

## Export Model as onnx model 

In [22]:
import onnx
import torch

model.eval()

output_path = 'lstm_hr_model.onnx'

dummy_input = torch.randn(1, 1, 100)

input_names = ['input']
output_names = ['output']

torch.onnx.export(model, dummy_input, output_path,
                  verbose=True, input_names=input_names, output_names=output_names,
                  dynamic_axes={'input': [0], 'output': [0]})
metadata = build_model_metadata(model)

model = onnx.load_model(output_path)
model.doc_string = output_path.split('.onnx')[0]
for key, value in metadata.items():
    meta = model.metadata_props.add()
    meta.key = key
    meta.value = str(value)
onnx.save_model(model, output_path)

  "Automatically generated names will be applied to each dynamic axes of input {}".format(key))
  "Automatically generated names will be applied to each dynamic axes of input {}".format(key))


graph(%input : Float(*, 1, 100, strides=[100, 100, 1], requires_grad=0, device=cpu),
      %conv1.weight : Float(6, 1, 5, strides=[5, 5, 1], requires_grad=1, device=cpu),
      %conv1.bias : Float(6, strides=[1], requires_grad=1, device=cpu),
      %conv2.weight : Float(6, 6, 5, strides=[30, 5, 1], requires_grad=1, device=cpu),
      %conv3.weight : Float(16, 6, 5, strides=[30, 5, 1], requires_grad=1, device=cpu),
      %fc1.weight : Float(120, 800, strides=[800, 1], requires_grad=1, device=cpu),
      %fc1.bias : Float(120, strides=[1], requires_grad=1, device=cpu),
      %fc2.weight : Float(60, 120, strides=[120, 1], requires_grad=1, device=cpu),
      %fc2.bias : Float(60, strides=[1], requires_grad=1, device=cpu),
      %fc3.weight : Float(1, 60, strides=[60, 1], requires_grad=1, device=cpu),
      %fc3.bias : Float(1, strides=[1], requires_grad=1, device=cpu),
      %117 : Float(16, 16, 5, strides=[80, 5, 1], requires_grad=0, device=cpu),
      %118 : Float(16, strides=[1], requir

  "or define the initial states (h0/c0) as inputs of the model. ")


Download the Model

In [23]:
from google.colab import files
files.download(output_path)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>