This notebook contains code for replicating experiments from the paper "Uncertainty-Aware Deep Ensembles for Reliable and Explainable Predictions of Clinical Time Series" on ECG200 dataset. See [this webiste](http://www.timeseriesclassification.com/description.php?Dataset=ECG200) for more info on downloading the data. Remember to enable the use of GPU to speed up computations.

The following cell loads the packages used in this eexperiment, and mounts the drive. You need to change the "path" variable to the directory where the ecg data is stored in .arff format.

In [None]:
#@title Load packages and data
import os
import sys
import time
import copy
import random
import torch as th
import numpy as np
import pandas as pd
import seaborn as sns
import torch.nn as nn
import matplotlib.cm as cm
import torch.nn.init as init
import matplotlib.pyplot as plt

from google.colab import drive
from itertools import product
from matplotlib.colors import Normalize
from sklearn.decomposition import PCA
from IPython.display import clear_output
from sklearn.metrics import accuracy_score, confusion_matrix
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import LabelBinarizer, MinMaxScaler, StandardScaler

drive.mount('/content/drive')

path = 'INSERT YOUR PATH HERE'

def to_np(x):
    return x.cpu().detach().numpy()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


The following cell defines a function that loads data in the .arff format. The function is based on the same function from the sktime libray found at [this website](https://github.com/alan-turing-institute/sktime).

In [None]:
#@title Function for loading data in arff format. Code adopted from sktime library


def load_from_arff_to_dataframe(full_file_path_and_name, has_class_labels=True, return_separate_X_and_y=True, replace_missing_vals_with='NaN'):

    instance_list = []
    class_val_list = []

    data_started = False
    is_multi_variate = False
    is_first_case = True

    with open(full_file_path_and_name, 'r') as f:
        for line in f:

            if line.strip():
                if is_multi_variate is False and "@attribute" in line.lower() and "relational" in line.lower():
                    is_multi_variate = True

                if "@data" in line.lower():
                    data_started = True
                    continue

                # if the 'data tag has been found, the header information has been cleared and now data can be loaded
                if data_started:
                    line = line.replace("?", replace_missing_vals_with)

                    if is_multi_variate:
                        if has_class_labels:
                            line, class_val = line.split("',")
                            class_val_list.append(class_val.strip())
                        dimensions = line.split("\\n")
                        dimensions[0] = dimensions[0].replace("'", "")

                        if is_first_case:
                            for d in range(len(dimensions)):
                                instance_list.append([])
                            is_first_case = False

                        for dim in range(len(dimensions)):
                            instance_list[dim].append(pd.Series([float(i) for i in dimensions[dim].split(",")]))

                    else:
                        if is_first_case:
                            instance_list.append([])
                            is_first_case = False

                        line_parts = line.split(",")
                        if has_class_labels:
                            instance_list[0].append(pd.Series([float(i) for i in line_parts[:len(line_parts)-1]]))
                            class_val_list.append(line_parts[-1].strip())
                        else:
                            instance_list[0].append(pd.Series([float(i) for i in line_parts[:len(line_parts)]]))

    x_data = pd.DataFrame(dtype=np.float32)
    for dim in range(len(instance_list)):
        x_data['dim_' + str(dim)] = instance_list[dim]

    if has_class_labels:
        if return_separate_X_and_y:
            return x_data, np.asarray(class_val_list)
        else:
            x_data['class_vals'] = pd.Series(class_val_list)

    return x_data

In [None]:
#@title Dataset loader and create dataset
def load_dataset(split, path, return_X_y=True):

    if split in ["TRAIN", "TEST"]:
        fname = 'ECG200_' + split + '.arff'
        abspath = os.path.join(path, fname)
        X, y = load_from_arff_to_dataframe(abspath)

    y = LabelBinarizer().fit_transform(y)
    if y.shape[1] == 1: y = np.hstack((y, 1 - y))

    X = pd.DataFrame(X).to_numpy()
    y = pd.DataFrame(y).to_numpy()

    X = np.array(np.ndarray.tolist(X), dtype=np.float32)
    y = np.array(np.ndarray.tolist(y), dtype=np.int32)

    where_are_NaNs = np.isnan(X)
    X[where_are_NaNs] = 0

    scaler = StandardScaler()

    for i in range(X.shape[1]):
        X[:, i, :] = scaler.fit_transform(X[:, i, :])

    # Return appropriately
    if return_X_y:
        return X, y
    else:
        X['class_val'] = pd.Series(y)
        return X

class TSDatasetSupervised(Dataset):
    def __init__(self, split, device='cuda'):

        self.device = device

        x, y = load_dataset(split=split, path=path, return_X_y=True)

        self.x = th.tensor(x, dtype=th.float, device=device)
        self.y = th.tensor(y, dtype=th.long, device=device).argmax(1)
        self.n_c = 2

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):      
        return self.x[idx], self.y[idx]

The following cell creates the class for the convolutional neural network used in this experiment. The model is based on the fully convolutional network of Want et.al.: [Time Series Classification from Scratch with Deep Neural Networks: A Strong Baseline](https://arxiv.org/abs/1611.06455).



In [None]:
#@title Define CNN

class CNN(nn.Module):
    def __init__(self, n_in, n_c):
        super(CNN, self).__init__()

        n_hid = 128
        act = nn.ReLU()

        self.l1 = nn.Sequential(
            nn.Conv1d(n_in, n_hid, kernel_size=7, padding=3),
            nn.BatchNorm1d(n_hid),
            act
        )

        self.l2 = nn.Sequential(
            nn.Conv1d(n_hid, 2*n_hid, kernel_size=5, padding=2),
            nn.BatchNorm1d(2*n_hid),
            act
        )

        self.l3 = nn.Sequential(
            nn.Conv1d(2*n_hid, n_hid, kernel_size=3, padding=1),
            nn.BatchNorm1d(n_hid),
            act
        )

        self.gap = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten()
        )

        self.out = nn.Linear(n_hid, n_c)

    def forward(self, x):

        l1 = self.l1(x)
        l2 = self.l2(l1)
        l3 = self.l3(l2)

        gap = self.gap(l3)
        out = self.out(gap)

        return out

    def CAM(self, x):
 
        l1 = self.l1(x)
        l2 = self.l2(l1)
        l3 = self.l3(l2)

        out = th.matmul(l3.transpose(2, 1), self.out.weight.T)
        out = nn.functional.relu(out)
        out = out.cpu().detach().numpy()

        return out



The following cell creates function that are used throughout the rest of the code.

In [None]:
#@title Define some useful functions

def calc_metrics(y, y_hat):

    TN, FP, FN, TP = confusion_matrix(y, y_hat).ravel()

    precision = TP / (TP+FP)
    FallOut = FP / (FP+TN)
    recall = TP / (TP+FN)
    NPV = TN / (TN+FN)

    return [precision, recall, NPV, FallOut]


def get_ensemble_pred(x, model_list):

    for model_idx, model in enumerate(model_list):
        out = model(xte)
        if model_idx == 0: y_hat = to_np(th.nn.functional.softmax(out, dim=1))
        else: y_hat += to_np(th.nn.functional.softmax(out, dim=1))

    y_hat = y_hat / len(model_list)

    return y_hat

The following cell traings the single model and the ensemble model. In this code, "M" denotes the number of models in the ensemble, and "num_repeats" denotes the number of times the experiments will be repeated. This is set to 1 here, but in the paper we do 10 independet training runs.



In [None]:
#@title Experiment

metrics_list = []
ensemble_metrics_list = []

M, num_repeats = 10, 1
model_list = []


training_set = TSDatasetSupervised('TRAIN')
test_set = TSDatasetSupervised('TEST')

batch_size_tr = len(training_set.x)
batch_size_te = len(test_set.x)

training_generator = DataLoader(training_set, batch_size=batch_size_tr,
                                shuffle=True, drop_last=False)
test_generator = DataLoader(test_set, batch_size= batch_size_te,
                                shuffle=True, drop_last=False)

for run in range(num_repeats):
    for m in range(M):

        print(f"Model number {m}")

        model = CNN(training_set.x.shape[1], training_set.n_c).to('cuda')
        criterion = th.nn.CrossEntropyLoss()
        optimizer = th.optim.Adam(model.parameters())

        LossList, AccList, epochs = [], [], 150

        for i in range(epochs):
            for xtr, ytr in training_generator:

                model.train()
                optimizer.zero_grad()

                xtr = xtr.to('cuda')
                out = model(xtr)

                loss = criterion(out, ytr)
                loss.backward()
                LossList.append(loss.item())
                optimizer.step()

            with th.no_grad():
                model.eval()
                for xte, yte in test_generator:

                    out = model(xte)
                    y_hat = th.argmax(out, 1)
                    accuracy = th.eq(yte, y_hat).sum().float() / len(xte)
                    AccList.append(accuracy.item())

        model_list.append(copy.deepcopy(model))
        metrics_list.append(calc_metrics(to_np(yte), to_np(y_hat)))
        print(metrics_list[-1])

    y_hat_ensemble = get_ensemble_pred(xte, model_list)
    ensemble_metrics_list.append(calc_metrics(to_np(yte), y_hat_ensemble.argmax(1)))
    print(ensemble_metrics_list[-1])


The following cell makes a prediction for a random sample in the test data and plots the time steps that are most relevant for the prediction, and the uncertainty of the relevance scores.



In [None]:
#@title CAM illustration

model.eval()
N, T = test_set.x.shape[0], test_set.x.shape[-1]
sample = np.random.choice(list(np.arange(0, N, 1)), 1)[0]

my_cmap_mean = cm.get_cmap('Greens')
my_cmap_std = cm.get_cmap('Reds')
sm_mean = plt.cm.ScalarMappable(cmap=my_cmap_mean, norm=Normalize(vmin=0, vmax=1))
sm_std = plt.cm.ScalarMappable(cmap=my_cmap_std, norm=Normalize(vmin=0, vmax=1))

x_plot = np.arange(0, T, 1)

interpret_array =  np.array([model.CAM(test_set.x) for model in model_list])
interpret_array = interpret_array / (interpret_array.max(1, keepdims=True)+1.e-8)

interpret_mean = interpret_array.mean(0)
interpret_std = interpret_array.std(0)

pred = get_ensemble_pred(test_set.x, model_list).argmax(1)
y_samp = to_np(test_set.y[sample])
pred_samp = pred[sample]

scale_col_mean = np.abs(interpret_mean[sample, :, pred_samp] / (np.abs(interpret_mean[sample, :, pred_samp]).max())).reshape(T, 1)
color_weight_mean = np.ones((T, 3))
color_weight_mean = np.concatenate((color_weight_mean, scale_col_mean), 1)
plot_col_mean = my_cmap_mean(interpret_mean[sample, :, pred_samp])*color_weight_mean

plt.figure(1, figsize=(15, 5))
plt.plot(x_plot, to_np(test_set.x[sample])[0], color='black')
plt.bar(x_plot, 200*np.ones_like(x_plot), bottom=-100, width=1.0,
        color=plot_col_mean)
plt.xlim(0, T)
plt.ylim(to_np(test_set.x[sample])[0].min(), to_np(test_set.x[sample])[0].max())
plt.title(f"Mean relevance score. True: {y_samp}. Predicted: {pred_samp}")
plt.ylabel('mV')
plt.xlabel('ms')
plt.show()

scale_col_std = np.abs(interpret_std[sample, :, pred_samp] / (np.abs(interpret_std[sample, :, pred_samp]).max())).reshape(T, 1)
color_weight_std = np.ones((T, 3))
color_weight_std = np.concatenate((color_weight_std, scale_col_std), 1)
plot_col_std = my_cmap_std(interpret_std[sample, :, pred_samp])*color_weight_std

plt.figure(2, figsize=(15, 5))
plt.plot(x_plot, to_np(test_set.x[sample])[0], color='black')
plt.bar(x_plot, 200*np.ones_like(x_plot), bottom=-100, width=1.0,
        color=plot_col_std)
plt.xlim(0, T)
plt.ylim(to_np(test_set.x[sample])[0].min(), to_np(test_set.x[sample])[0].max())
plt.title(f"Uncertainty of relevance score. True: {y_samp}. Predicted: {pred_samp}")
plt.ylabel('mV')
plt.xlabel('ms')
plt.show()

filtered_time_steps = (scale_col_std.squeeze() < scale_col_std.squeeze().mean())*1.0
scale_col_filtered = np.reshape(scale_col_mean.squeeze() * filtered_time_steps, (T, 1))
color_weight_filtered = np.ones((T, 3))
color_weight_filtered = np.concatenate((color_weight_filtered, scale_col_filtered), 1)
plot_col_filtered = my_cmap_mean(interpret_mean[sample, :, pred_samp])*color_weight_filtered


plt.figure(3, figsize=(15, 5))
plt.plot(x_plot, to_np(test_set.x[sample])[0], color='black')
plt.bar(x_plot, 200*np.ones_like(x_plot), bottom=-100, width=1.0,
        color=plot_col_filtered)
plt.xlim(0, T)
plt.ylim(to_np(test_set.x[sample])[0].min(), to_np(test_set.x[sample])[0].max())
plt.ylabel('mV')
plt.xlabel('ms')
plt.title(f"Uncertainty-filtered mean relevance score. True: {y_samp}. Predicted: {pred_samp}")
plt.show()

The following cell computes the consistency for single and ensemble model. This cell takes some time to run.

In [None]:
#@title Compute consistency over independent training runs


num_repeats, M = 10, 10
topk_list = [5, 7, 10, 15]

training_set = TSDatasetSupervised('TRAIN')
test_set = TSDatasetSupervised('TEST')

batch_size_tr = len(training_set.x)
batch_size_te = len(test_set.x)

training_generator = DataLoader(training_set, batch_size=batch_size_tr,
                                shuffle=True, drop_last=False)
test_generator = DataLoader(test_set, batch_size= batch_size_te,
                                shuffle=True, drop_last=False)

single_model_rel_scores = np.zeros((num_repeats, 100, test_set.x.shape[-1]))
ensemble_model_rel_scores = np.zeros((num_repeats, 100, test_set.x.shape[-1]))

for repeat_n in range(num_repeats):

    model_list = []

    print(f"Run number {repeat_n}")

    for m in range(M):

        print(f"Model number {m}")

        model = CNN(training_set.x.shape[1], training_set.n_c).to('cuda')
        criterion = th.nn.CrossEntropyLoss()
        optimizer = th.optim.Adam(model.parameters())

        LossList, AccList, epochs = [], [], 150

        for i in range(epochs):
            for xtr, ytr in training_generator:

                model.train()
                optimizer.zero_grad()

                xtr = xtr.to('cuda')
                out = model(xtr)

                loss = criterion(out, ytr)
                loss.backward()
                LossList.append(loss.item())
                optimizer.step()

            with th.no_grad():
                model.eval()
                for xte, yte in test_generator:

                    out = model(xte)
                    y_hat = th.argmax(out, 1)
                    accuracy = th.eq(yte, y_hat).sum().float() / len(xte)
                    AccList.append(accuracy.item())

        model_list.append(copy.deepcopy(model))

    interpret_array = model_list[np.random.randint(0, len(model_list), 1)[0]].CAM(test_set.x)
    single_model_rel_scores[repeat_n] = np.take_along_axis(interpret_array,
                                                           to_np(test_set.y[:, None, None]), axis=2).squeeze()


    interpret_array_ensemble = np.array([model.CAM(test_set.x) for model in model_list])
    interpret_array_ensemble = interpret_array_ensemble.mean(0)

    ensemble_model_rel_scores[repeat_n] = np.take_along_axis(interpret_array_ensemble,
                                                             to_np(test_set.y[:, None, None]), axis=2).squeeze()

    clear_output()

for topk_idx, topk in enumerate(topk_list):

    interpret_idx = single_model_rel_scores.argsort(2)[:, :, -topk:]
    sim_array = np.zeros((num_repeats, num_repeats))

    for i in range(num_repeats):
        for j in range(num_repeats):
            sim_array[i, j] = sum([np.isin(interpret_idx[i, n], interpret_idx[j, n]).sum() / topk for n in range(len(test_set.x))]) / len(test_set.x)

    print(f"Consistency for single model: {sim_array.mean()}")

for topk_idx, topk in enumerate(topk_list):

    interpret_idx = ensemble_model_rel_scores.argsort(2)[:, :, -topk:]
    sim_array = np.zeros((num_repeats, num_repeats))

    for i in range(num_repeats):
        for j in range(num_repeats):
            sim_array[i, j] = sum([np.isin(interpret_idx[i, n], interpret_idx[j, n]).sum() / topk for n in range(len(test_set.x))]) / len(test_set.x)

    print(f"Consistency for ensemble model: {sim_array.mean()}")