# Cross-Device Federated Unsupervised Learning of Electrocardiogram Signals

This is the analysis script of the results of the federated learning and on-device fine-tuning (personalized) routines exported from the mobile devices to the results folder.

In [None]:
import glob
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import metrics
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.metrics import balanced_accuracy_score

## 1. Loading of the Data

In [None]:
def load_results(path):
    pd.set_option('future.no_silent_downcasting', True)
    df = pd.DataFrame()
    
    for file in glob.glob(path + "*.csv"):
        temp = pd.read_csv(file, header=None)
        temp.columns = ["segment", "quality", "groundtruth", "residual", "embedding"]
        temp["device"] = int(file.rsplit('/', 1)[-1].replace("device_", "").replace("_ecg_data.csv", ""))

        temp_ = temp['embedding'].str.split(';', expand=True)
        col_names_embedding = [f'lvsd_{i+1}' for i in range(temp_.shape[1])]
    
        temp_.columns = col_names_embedding
        temp_ = temp_.astype(float)
    
        temp = pd.concat([temp, temp_], axis=1)
        temp = temp.drop('embedding', axis=1)
        
        group_means = temp.groupby('device')[col_names_embedding].transform('mean')
        temp['euclidean_distance'] = np.sqrt(((temp[col_names_embedding] - group_means) ** 2).sum(axis=1))
            
        df = pd.concat([df, temp])

    df.groundtruth = df.groundtruth.replace({"Normal": 0, "PAC": 1, "PVC": 2})
    df.reset_index(inplace=True, drop=True)
    
    return df

In [None]:
def standardize(group):
    return (group - group.mean()) / group.std()

In [None]:
df_federated = load_results("./results/federated/")
df_personalized = load_results("./results/personalized/")

In [None]:
df_federated["residual_standardized"] = df_federated.groupby('device')['residual'].transform(standardize)
df_personalized["residual_standardized"] = df_personalized.groupby('device')['residual'].transform(standardize)

df_federated["euclidean_distance_standardized"] = df_federated.groupby('device')['euclidean_distance'].transform(standardize)
df_personalized["euclidean_distance_standardized"] = df_personalized.groupby('device')['euclidean_distance'].transform(standardize)

In [None]:
test_segments = pd.read_csv("./results/test_segments.csv")

In [None]:
df_federated["test"] = df_federated.segment.isin(test_segments.testSegment)
df_personalized["test"] = df_personalized.segment.isin(test_segments.testSegment)

## 2. Data Insights

First, we want to check the residual per segment, which is directly related to the loss of the optimization process. Since we know that one segment was the test data set, there should be no segment that differs greatly from the others in terms of the residual/error.

### 2.1. Functions

In [None]:
def plot_loss_segments(df, path, mode="federated"):
    
    df = df.sort_values(by=["device"])
    segment_loss = df.groupby(["device", "segment"])["residual"].mean().reset_index()
    segment_loss['segments'] = segment_loss.groupby(['device']).cumcount()
    segment_loss.reset_index(drop=True, inplace=True)
    segment_loss = segment_loss.merge(df[['device', 'segment', 'test']], on=['device', 'segment'])
    
    g = sns.FacetGrid(segment_loss, col="device", col_wrap=4, height=4, aspect=2)
    
    def barplot_with_highlight(data, **kwargs):
        sns.barplot(data=data, x="segments", y="residual", hue="test", dodge=False, palette={True: 'orange', False: 'royalblue'}, legend=True, **kwargs)
    
    g.map_dataframe(barplot_with_highlight)
    
    handles = [plt.Line2D([0], [0], color='orange', lw=8, label='Train'),
               plt.Line2D([0], [0], color='royalblue', lw=8, label='Test')]
    plt.legend(handles=handles,title='Segment', loc='upper right', labels=['Test', 'Train'], fontsize=30, title_fontsize=30)
    
    g.set_titles("Device {col_name}", size=30)
    g.fig.subplots_adjust(wspace=0.1, hspace=0.2)
    g.set_axis_labels("Segments", "Residual (MAE)", fontsize=30)
    g.set(xticks=[])

    for ax in g.axes.flat:
        ticks = ax.get_yticks()
        new_ticks = [0.0, 0.05, 0.1]
        ax.set_yticks(new_ticks)
        ax.set_yticklabels(new_ticks, fontsize=30)
        
    g.fig.suptitle(f"Average Reconstruction Error (Mean Absolute Error) of the Segments for the {mode} Model", fontsize=30, y=1.05)
    
    plt.savefig(path, dpi=300, bbox_inches="tight")
    plt.show()

In [None]:
def plot_embedding_space(df, label="groundtruth", method="pca", path="./media/embedding.png", fontsize=12, num_samples = 50000):
    
    col_names_embedding = [f'lvsd_{i+1}' for i in range(12)]
    X = df[col_names_embedding].values
    color = df[label]

    row_indices = np.random.choice(X.shape[0], num_samples, replace=True)
    X = X[row_indices, :]
    if method == "tsne":
        algo = TSNE(n_components=2, learning_rate='auto', init='random', perplexity=3)
    else:
        algo = PCA(n_components=2)

    X_emb = algo.fit_transform(X)
    X_emb = pd.DataFrame(X_emb, columns=["A1", "A2"])
    X_emb["label"] = color.values[row_indices]
    
    fig, ax = plt.subplots(1, 1, figsize=(5, 5)) #plt.figure(figsize=(5, 5))
    palette = sns.color_palette("tab20", len(df[label].unique()))
    sns.scatterplot(data=X_emb, x="A1", y="A2", hue="label", palette=palette, s=2, legend=False)

    ax.set_xlabel("First Embedding Axis", fontsize=fontsize)
    ax.set_ylabel("Second Embedding Axis", fontsize=fontsize)
    ax.tick_params(axis='both', which='major', labelsize=fontsize)
    
    plt.savefig(path, dpi=300, bbox_inches="tight")
    plt.show()

In [None]:
def plot_data_distribution(df, path="./media/data_distribution.png"):
    fig, axes = plt.subplots(nrows=5, ncols=4, figsize=(15, 12), sharex=True, sharey=True)
    fig.subplots_adjust(hspace=0.2, wspace=0.05)
    axes = axes.flatten()

    fig.suptitle('Class Distribution of Heartbeat Segments by Device/Subject', fontsize=16)
    
    for i, ax in enumerate(axes):
        if i < len(df):
            device_subject = df.loc[i, 'Device/Subject']
            values = df.loc[i, ['Normal', 'PAC', 'PVC']]
            bars = values.plot(kind='bar', ax=ax)
    
            # Add numbers on the bars
            for bar in bars.patches:
                height = bar.get_height()
                ax.text(
                    bar.get_x() + bar.get_width() / 2,
                    height,
                    f'{int(height)}',
                    ha='center',
                    va='bottom'
                )
    
            ax.set_title(device_subject)
            ax.set_ylabel('Count')
            ax.set_xticklabels(['Normal', 'PAC', 'PVC'], rotation=45)
    
    for j in range(len(df), len(axes)):
        fig.delaxes(axes[j])
    plt.tight_layout(rect=[0, 0, 1, 0.99])
    plt.savefig(path, dpi=300, bbox_inches="tight")
    plt.show()

### 2.2. Insights

In [None]:
plot_loss_segments(df_federated, "./media/loss_segment_federated.png", mode="Federated")

In [None]:
plot_loss_segments(df_personalized, "./media/loss_segment_personalized.png", mode="Fine-tuned")

Distribution of annotations among the twenty devices:

In [None]:
device_subject_mapping = pd.read_csv("./results/device_subject.csv")

In [None]:
df_class_distribution = pd.pivot_table(
    df_federated, values='quality', index=['device'], columns=['groundtruth'], aggfunc="count",
).rename({0: "Normal", 1: "PAC", 2: "PVC"}, axis=1).reset_index().merge(device_subject_mapping, on="device")

In [None]:
df_class_distribution["Device/Subject"] = "Device " + df_class_distribution.device.astype(str) + " / " + df_class_distribution.subject

In [None]:
plot_data_distribution(df_class_distribution)

In [None]:
print("Total ECG segments", "\t Personalized:", len(df_personalized), "\t Federated:", len(df_federated))
print("ECG segments normal", "\t Personalized:", len(df_personalized[df_personalized.groundtruth == 0]), "\t Federated:", len(df_federated[df_federated.groundtruth == 0]))
print("ECG segments anomal", "\t Personalized:", len(df_personalized[df_personalized.groundtruth > 0]), "\t Federated:", len(df_federated[df_federated.groundtruth > 0]))
print("ECG segments PAC", "\t Personalized:", len(df_personalized[df_personalized.groundtruth == 1]), "\t Federated:", len(df_federated[df_federated.groundtruth == 1]))
print("ECG segments PVC", "\t Personalized:", len(df_personalized[df_personalized.groundtruth == 2]), "\t Federated:", len(df_federated[df_federated.groundtruth == 2]))
print("ECGs medium quality", "\t Personalized:", len(df_personalized[df_personalized.quality == "medium"]), "\t Federated:", len(df_federated[df_federated.quality == "medium"]))
print("ECGs excellent quality", "\t Personalized:", len(df_personalized[df_personalized.quality == "excellent"]), "\t Federated:", len(df_federated[df_federated.quality == "excellent"]))

In [None]:
plot_embedding_space(df_federated, label="device", method="tsne", fontsize=12, num_samples=10_000)

## 3. Anomaly Detection

### 3.1. Functions

In [None]:
def get_best_threshold(fpr, tpr, thresholds):
    distances = np.sqrt((1 - tpr) ** 2 + fpr ** 2)
    best_index = np.argmin(distances)
    best_threshold = thresholds[best_index]
    return best_threshold

def plot_roc_curve_comparison(ax, df_federated, df_personalized, device=None, fontsize=12):
    
    if device is not None:
        df_federated = df_federated.loc[df_federated.device == device,:]
        df_personalized = df_personalized.loc[df_personalized.device == device,:]

    y = df_federated.groundtruth > 0
    
    pred = df_federated.residual_standardized
    fpr, tpr, thresh = metrics.roc_curve(y, pred)
    thres_fed_res = get_best_threshold(fpr, tpr, thresh)
    auc = metrics.roc_auc_score(y, pred)
    ax.plot(fpr, tpr, label="Fed Res (AUC=" + str(np.round(auc, 2)) + ")")

    pred = df_federated.combined
    fpr, tpr, thresh = metrics.roc_curve(y, pred)
    thres_fed_resemb = get_best_threshold(fpr, tpr, thresh)
    auc = metrics.roc_auc_score(y, pred)
    ax.plot(fpr, tpr, label="Fed ResEmb (AUC=" + str(np.round(auc, 2)) + ")")

    pred = df_personalized.residual_standardized
    fpr, tpr, thresh = metrics.roc_curve(y, pred)
    thres_ft_res = get_best_threshold(fpr, tpr, thresh)
    auc = metrics.roc_auc_score(y, pred)
    ax.plot(fpr, tpr, label="FT Res (AUC=" + str(np.round(auc, 2)) + ")")
    
    pred = df_personalized.combined
    fpr, tpr, thresh = metrics.roc_curve(y, pred)
    thres_ft_resemb = get_best_threshold(fpr, tpr, thresh)
    auc = metrics.roc_auc_score(y, pred)
    ax.plot(fpr, tpr, label="FT ResEmb (AUC=" + str(np.round(auc, 2)) + ")")
    
    if device != None:
        ax.set_title(f"Device {str(device)}", size=fontsize+4)
    ax.legend(loc=0, fontsize=fontsize)
    ax.tick_params(axis='both', which='major', labelsize=fontsize)

    return thres_fed_res, thres_fed_resemb, thres_ft_res, thres_ft_resemb

def create_roc_curve_grid(df_federated, df_personalized, rows, cols, fontsize=14, path="./media/roc_curve_comparison_grid.png"):
    fig, axes = plt.subplots(rows, cols, figsize=(5*cols, 5*rows), sharex=True, sharey=True)
    thresholds = []

    for k in range(1, rows * cols + 1):
        device = k
        ax = axes[(k-1) // cols, (k-1) % cols]
        _, _, _, t_ft_resemb = plot_roc_curve_comparison(ax, df_federated, df_personalized, device=device, fontsize=fontsize)
        thresholds.append(t_ft_resemb)

    fig.suptitle('Individual ROC AUC Comparison for Each Device', fontsize=fontsize + 4)
    fig.text(0.5, -0.01, 'False Positive Rate', ha='center', fontsize=fontsize + 4)
    fig.text(-0.01, 0.5, 'True Positive Rate', va='center', rotation='vertical', fontsize=fontsize + 4)

    plt.tight_layout(rect=[0, 0, 1, 0.985])
    
    plt.savefig(path, dpi=300, bbox_inches="tight")
    plt.show()
    return thresholds

def create_roc_curve_single(df_federated, df_personalized, fontsize=12, path="./media/roc_curve_comparison.png"):
    fig, ax = plt.subplots(1, 1, figsize=(5, 5))
    t_fed_res, t_fed_resemb, t_ft_res, t_ft_resemb = plot_roc_curve_comparison(
        ax,
        df_federated,
        df_personalized,
        fontsize=fontsize,
    )

    ax.set_xlabel('False Positive Rate', fontsize=fontsize)
    ax.set_ylabel('True Positive Rate', fontsize=fontsize)
    
    plt.tight_layout()
    plt.savefig(path, dpi=300, bbox_inches="tight")
    plt.show()

    return t_fed_res, t_fed_resemb, t_ft_res, t_ft_resemb

def plot_confusion_matrix(ax, df, threshold, fontsize=14, path="./media/confusion_matrix.png"):
    
    y_true = df.groundtruth > 0
    y_pred = df.combined > threshold

    bas = balanced_accuracy_score(y_true, y_pred)
    
    custom_labels = ['Normal', 'Anomaly']
    cm = metrics.confusion_matrix(y_true, y_pred)
    
    disp = metrics.ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=custom_labels)
    disp.plot(ax=ax, cmap=plt.cm.Blues, colorbar=False)

    return cm, bas

def calculate_cm_metrics(cm):
    TN, FP, FN, TP = cm.ravel()
    
    specificity = TN / (TN + FP)
    sensitivity = TP / (TP + FN)
    f1_score = (2*TP) / (2*TP + FP + FN)

    scores = {
        "sensitivity": sensitivity,
        "specificity": specificity,
        "f1": f1_score,
    }
    
    return scores

def confusion_matrix_metrics(df, threshold):
    fig, axes = plt.subplots(1, 1, figsize=(5, 5))
    cm_single, bas_single = plot_confusion_matrix(axes, df, threshold)
    plt.tight_layout()
    plt.savefig("./media/confusion_matrix.png", dpi=300, bbox_inches="tight")
    plt.show()
    metrics_cm = calculate_cm_metrics(cm_single)
    print(
        "Overall",
        f"Threshold: {np.round(threshold, 4)}",
        f"Sensitivity {np.round(metrics_cm['sensitivity'], 4)}\t",
        f"Specificity {np.round(metrics_cm['specificity'], 4)}\t",
        "Balanced Accuracy:", np.round(bas_single, 4),
    )

### 3.2. Results

In [None]:
df_federated["combined"] = df_federated.euclidean_distance_standardized + df_federated.residual_standardized
df_federated["combined"] = df_federated.groupby('device')['combined'].transform(standardize)

df_personalized["combined"] = df_personalized.euclidean_distance_standardized + df_personalized.residual_standardized
df_personalized["combined"] = df_personalized.groupby('device')['combined'].transform(standardize)

In [None]:
filter_noisy_signal = False
if filter_noisy_signal:
    mask_federated = ~((df_federated.quality == "medium") & (df_federated.groundtruth == 0))
    mask_personalized = ~((df_personalized.quality == "medium") & (df_personalized.groundtruth == 0))
    df_federated = df_federated[mask_federated].reset_index(drop=True)
    df_personalized = df_personalized[mask_personalized].reset_index(drop=True)

In [None]:
t_fed_res, t_fed_resemb, t_ft_res, t_ft_resemb = create_roc_curve_single(df_federated, df_personalized, fontsize=12)

In [None]:
thresholds_all = create_roc_curve_grid(df_federated, df_personalized, rows=5, cols=4, fontsize=14)

In [None]:
for k in [t_fed_res, t_fed_resemb]:
    confusion_matrix_metrics(df_federated, k)

In [None]:
for k in [t_ft_res, t_ft_resemb]:
    confusion_matrix_metrics(df_personalized, k)

In [None]:
# PAC
confusion_matrix_metrics(df_personalized[df_personalized.groundtruth != 2], t_ft_resemb)

In [None]:
# PVC
confusion_matrix_metrics(df_personalized[df_personalized.groundtruth != 1], t_ft_resemb)

In [None]:
mask_fp_mQuality = (
    (df_personalized.groundtruth > 0) != (df_personalized.combined > t_ft_resemb)
) & (
    df_personalized.groundtruth == 0
) & (
    df_personalized.quality == "medium"
)
print("Number of FP with a medium signal quality:", len(df_personalized[mask_fp_mQuality]))

In [None]:
rows = 5
cols = 4
cms = []
bas_all = []

fig, axes = plt.subplots(rows, cols, figsize=(5*cols, 5*rows), sharex=True, sharey=True)
axes = axes.flatten()

for k in range(1, rows * cols + 1):
    ax = axes[k-1]
    cm, bas = plot_confusion_matrix(
        ax,
        df_personalized[df_personalized.device == k],
        thresholds_all[k-1]
    )
    ax.set_title(f"Device {str(k)}", size=12+4)
    cms.append(cm)
    bas_all.append(bas)

fig.suptitle('Individual Confusion Matrix for Each Device', fontsize=12 + 4)

plt.tight_layout(rect=[0, 0, 1, 0.985])
plt.savefig("./media/confusion_matrix_all.png", dpi=300, bbox_inches="tight")
plt.show()

In [None]:
cm_all = np.zeros((2,2))
for i, k in enumerate(cms):
    metrics_cm = calculate_cm_metrics(k)
    print(
        f"Device {i+1}\t",
        f"Subject {device_subject_mapping[device_subject_mapping.device == i+1]['subject'].iloc[0]}\t",
        f"Threshold {np.round(thresholds_all[i], 4)}\t",
        f"Sensitivity {np.round(metrics_cm['sensitivity'], 4)}\t",
        f"Specificity {np.round(metrics_cm['specificity'], 4)}\t",
        f"Balanced Accuracy {np.round(bas_all[i], 4)}",
    )
    cm_all += k

In [None]:
cm_all

In [None]:
calculate_cm_metrics(cm_all)

## 4. Appendix

### 4.1. Federated Model

Loading the weights from the federated learning process to either make predictions on ECG data or to sample ECGs from the latent vector space (though no variational autoencoders). 

In [None]:
import torch
from io import BytesIO
from autoencoder import AE
from matplotlib import pyplot as plt
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

Load weights and transform them into a numpy array to inject them into the shared autoencoder model.

In [None]:
path_weights = "../src/federated/weights/round-250-weights.npz"
data = np.load(path_weights, allow_pickle=True)
parameters_serialized = data['arr_0'].item()
pretrained_weights = [np.load(BytesIO(tensor), allow_pickle=False) for tensor in parameters_serialized.tensors]

In [None]:
model = AE()
with torch.no_grad():
    for k, name in enumerate(model.named_parameters()):
        if k % 2 == 0:
            param = name[1]
            param.copy_(torch.from_numpy(pretrained_weights[int(k/2)]))

The reconstructed samples of the autoencoder optimized by the federated learning routine tend to be noisy. Therefore, we introduce a simple smoothing function.

In [None]:
def moving_average(signal, window_size):
    window = np.ones(int(window_size))/float(window_size)
    return np.convolve(signal, window, 'same')

Systematically toggling the values in the latent vector space is used to analyze the change in the reconstruction and thus possibly give the dimensions an interpretation.

In [None]:
def f(val, dim):
    # a randomly chosen encoding from the df_personalized dataframe
    embedding = [0.97, -1.57, 3.55, 3.67, -1.82, 3.61, -0.53, -2.51, 4.461, 2.12, 4.73, -0.74]
    my_tensor = torch.tensor(embedding, dtype=torch.float32)
    my_tensor[dim] = val
    reconstruction = moving_average(model.decoder(my_tensor).detach().numpy(), 10)
    plt.plot(reconstruction)

In [None]:
floatSlider = widgets.FloatSlider(min=-10, max=10, step=0.1, value=0)

In [None]:
interact(f, val=floatSlider, dim=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11])

In [None]:
embedding = [0.97, -1.57, 3.55, 3.67, -1.82, 3.61, -0.53, -2.51, 4.461, 2.12, 4.73, -0.74]
my_tensor = torch.tensor(embedding, dtype=torch.float32)

fig, axes = plt.subplots(nrows=4, ncols=3, figsize=(15, 15), sharex=True, sharey=True)
axes = axes.flatten()

for k in range(12):
    ax = axes[k]
    my_tensor = torch.tensor(embedding, dtype=torch.float32)
    for j in np.linspace(-5, 5, 5):
        my_tensor[k] = j
        # apply a smoothin operation to gain denoised signals
        reconstruction = moving_average(model.decoder(my_tensor).detach().numpy(), 10)
        ax.plot(reconstruction)
    ax.set_title(f'Dimension={k+1}')

fig.suptitle('Reconstruction Plots for Toggling the Value of Each Latent Vector Space Dimension', fontsize=16)

plt.tight_layout(rect=[0, 0, 1, 0.99])

plt.savefig("./media/reconstructions.png", dpi=300, bbox_inches="tight")
plt.show()

### 4.2. Sample Data Plot

To include a visual data example in the respective article, a suitable example is loaded from the Icentia11k dataset and the three relevant label classes (normal, PAC, PVC) and the corresponding ECGs are plotted.

In [None]:
# this sample needs to be downloaded manually
df_plot = pd.read_csv("./sample/p03984_24.csv")

In [None]:
index_pac = df_plot[df_plot.annotation == "pac"].index
index_pvc = df_plot[df_plot.annotation == "pvc"].index
index_normal = df_plot[df_plot.annotation == "normal"].index

In [None]:
values_pac = df_plot.iloc[(index_pac[2]-125):(index_pac[2]+125), 0].reset_index(drop=True).reset_index()
values_pvc = df_plot.iloc[(index_pvc[2]-125):(index_pvc[2]+125), 0].reset_index(drop=True).reset_index()
values_normal = df_plot.iloc[(index_normal[2]-125):(index_normal[2]+125), 0].reset_index(drop=True).reset_index()

In [None]:
values_pac["annotation"] = "PAC"
values_pvc["annotation"] = "PVC"
values_normal["annotation"] = "Normal"

In [None]:
values = pd.concat([values_pac, values_pvc, values_normal])

In [None]:
custom_texts = {
    'PAC': 'PAC',
    'PVC': 'PVC',
    'Normal': 'Normal'
}

palette = sns.color_palette("husl", len(values['annotation'].unique()))
g = sns.FacetGrid(values, row="annotation", aspect=5, height=2, despine=False)
g.map_dataframe(sns.lineplot, x="index", y="signal", hue="annotation", palette=palette)

for ax, annotation in zip(g.axes.flatten(), custom_texts.keys()):
    ax.set_ylabel('')
    ax.set_xlabel('')
    ax.tick_params(left=False, bottom=False)
    ax.get_yaxis().set_visible(False)
    ax.get_xaxis().set_visible(False)
    ax.set_title('')
    for spine in ax.spines.values():
        spine.set_visible(False)
    
    x_position = 135
    y_position = values[values['annotation'] == annotation]['signal'].mean() + 0.2
    ax.text(x_position, y_position, custom_texts[annotation], verticalalignment='center', fontsize=14)
    
g.fig.subplots_adjust(hspace=0)

g.savefig('./media/data.png', dpi=300, bbox_inches="tight")
plt.show()