# Librerie e funzioni d'utilità

In [None]:
import os
import wfdb as wf
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
import matplotlib.colors
import pandas as pd
import warnings
import shutil
warnings.filterwarnings('ignore')
%matplotlib inline

In [None]:
blu_fiordaliso = "#6495ED"
lilla = "#c8a2c8"
nero = "#000000"
gradient = ["#ffffff", "#dcc4dc","#c8a2c8", "#a787ad", "#93779c", "#735d82", "#6c4675"]
my_cmap1 = matplotlib.colors.LinearSegmentedColormap.from_list("", gradient)

In [None]:
matrix = np.array([[149030, 34],[7442, 12]])
norm = matplotlib.colors.Normalize(matrix.min(), matrix.max())
boundaries = [value for value in matrix.flatten().tolist()]
list.sort(boundaries)
colors = [[norm(boundaries[0]), "#dcc4dc"], 
          [norm(boundaries[1]), "#c8a2c8"], 
          [norm(boundaries[2]), "#93779c"], 
          [norm(boundaries[3]), "#6c4675"]]
my_cmap2 = matplotlib.colors.LinearSegmentedColormap.from_list("", colors)

In [None]:
def remove_and_make_dir(path):
    if os.path.exists(path):
        shutil.rmtree(path)
        os.mkdir(path)
    else:
        os.mkdir(path)

In [None]:
heartbeats_datasets = "heartbeats_dataset.csv"
base_heartbeats_path = ".\\heartbeats\\"
plot_path = ".\\plot\\"

In [None]:
remove_and_make_dir(base_heartbeats_path)
remove_and_make_dir(plot_path)

In [None]:
directory_dataset = ".\\mitdb\\"

In [None]:
# create directory "mitdb" and download dataset
if not os.path.exists(directory_dataset):
    os.mkdir(directory_dataset)
    wfdb.dl_database("mitdb", directory_dataset)

# wf.io.show_ann_classes()
# wf.io.show_ann_labels()

In [None]:
wf.io.show_ann_labels()

# Distribuzione etichette

In [None]:
# extract filename from directory
list_of_file = list(set([x.rsplit('.', 1)[0] for x in os.listdir(directory_dataset)]))
list_of_file.sort()

In [None]:
# key = filename, value = [record(tuple), annotation(array)]
dataset = {}

for filename in list_of_file:
    file = os.path.join(directory_dataset, filename)
    
    # read the file
    record = wf.rdsamp(file)
    annotation = wf.rdann(file, 'atr')
    dataset[filename] = [record, annotation]
    
    # info about the data
    print("File:", file)
    print("Sampling frequency:", record[1].get("fs"))
    print("Data shape:", record[0].shape)
    print("Annotations:", len(annotation.num))
    print("\n")

In [None]:
# distribution of annotation
labels = {}
for record in dataset.values():
    annotypes = np.array(record[1].symbol)
    for label in annotypes:
        if label in labels.keys():
            labels[label] += 1
        else:
            labels[label] = 1
labels = dict(sorted(labels.items(), key=lambda item: item[1], reverse=True))

In [None]:
# plot bar chart esteso
def bar_plot_ex(keys, values, title, path, dim):
    plt.figure(figsize=dim) 
    plt.yscale("log")
    plt.grid(color=nero, linestyle='-', linewidth=0.5, axis="y")
    plt.title(title)
    p = plt.bar(keys, values, width=1, color=lilla, edgecolor=nero, linewidth=0.5, align='center')
    plt.xticks(rotation = 90)
    plt.savefig(plot_path + path, bbox_inches='tight', transparent=True)
    plt.show()

In [None]:
keys = labels.keys()
values = [item for item in labels.values()]
title = "Distribuzione delle label nel dataset"
file_name = "".join(title.lower()).replace(" ", "_")
dim = (20,5)
bar_plot_ex(keys, values, title, file_name, dim)

In [None]:
tmp = {}
for key, record in dataset.items():
    annotypes = np.array(record[1].symbol)
    tmp[key] = dict.fromkeys(labels.keys(), 0)
    for label in annotypes:
        tmp[key][label] = tmp[key][label] + 1

df = pd.DataFrame.from_dict(tmp)
df = df.replace(0, np.nan)
df = df.T

In [None]:
fig, ax = plt.subplots(figsize=(20,20)) 
title = "Distribuzione label per file"
file_name = "".join(title.lower()).replace(" ", "_")
ax.set_title(title)
ax.set_xlabel("Label")
ax.set_ylabel("File")
heatmap = sns.heatmap(df, ax=ax, annot=True, fmt=".0f", cmap="Purples", cbar_kws={"shrink": .5})
a = heatmap.set_yticklabels(heatmap.get_yticklabels(), rotation = 0, fontsize = 12)
b = heatmap.set_xticklabels(heatmap.get_xticklabels(), rotation = 0, fontsize = 12)
fig.savefig(plot_path + file_name, bbox_inches='tight', transparent=True)

# Plot ECG

In [None]:
def plot_ecg(channel, sample_start, sample_size, record, annotation):
    # get data and annotations for the samples selected below
    sample_end = sample_start + sample_size
    signal = record[0][sample_start:sample_end, channel]

    # plot the heart beats
    # time scale is number of readings divided by sampling frequency
    times = (np.arange(sample_size, dtype = 'float') + sample_start) / record[1].get('fs')
    plt.figure(figsize=(20,7))
    plt.plot(times, signal)

    # extract annotations
    where = np.logical_and(annotation.sample >= sample_start, annotation.sample < sample_end)
    annots = annotation.sample[where] - sample_start
    annotypes = np.array(annotation.symbol)
    annotypes = annotypes[where]

    # plot the annotations
    annotimes = times[annots]
    plt.plot(annotimes, np.ones_like(annotimes) * signal.max() * 1.4, 'ro')

    # annotation codes
    for idx, annot in enumerate(annots):
        plt.annotate(annotypes[idx], xy = (times[annot], signal.max() * 1.1))

    plt.xlim([sample_start / record[1].get('fs'), (sample_end / record[1].get('fs'))])
    plt.xlabel('Offset')
    plt.ylabel(record[1].get('sig_name')[channel])
    plt.show()

In [None]:
# there are 2 channels -> MLII wave = 0
channel = 0             

# start of the sample in the file
sample_start = 0        

# number of readings (360 per second)
sample_size = 4000      

record = dataset["100"][0]
annotation = dataset["100"][1]

plot_ecg(channel, sample_start, sample_size, record, annotation)

# Creazione dataset di heartbeat

In [None]:
def get_heartbeat(channel, sample_start, sample_size, record, annotation):

    sample_end = sample_start + sample_size
    signal = record[0][sample_start:sample_end, channel]

    times = (np.arange(sample_size, dtype = 'float') + sample_start) / record[1].get('fs')

    where = np.logical_and(annotation.sample >= sample_start, annotation.sample < sample_end)
    annots = annotation.sample[where] - sample_start
    annotypes = np.array(annotation.symbol)
    annotypes = annotypes[where]

    annotimes = times[annots]
    
    return (signal, times, annotypes, annotimes, annots)    

In [None]:
def plot_heartbeat(signal, times, annotypes, annotimes, annots):

    plt.figure(figsize=(20,7))
    plt.plot(times, signal)
    plt.plot(annotimes, np.ones_like(annotimes) * signal.max() * 1.4, 'ro')

    for idx, annot in enumerate(annots):
        plt.annotate(annotypes[idx], xy = (times[annot], signal.max() * 1.1))

    plt.xlabel('Offset')
    plt.ylabel(record[1].get('sig_name')[channel])
    plt.show()

In [None]:
channel = 0             
heartbeat_size = 300 
ds = []
for key in dataset.keys():
    record = dataset[key][0]
    annotation = dataset[key][1]
    for pos_of_annotation in annotation.sample:
        heartbeat_start = 0 if (pos_of_annotation - 149) < 0 else pos_of_annotation - 149
        ds.append(get_heartbeat(channel, heartbeat_start, heartbeat_size, record, annotation))

In [None]:
df = pd.DataFrame(ds, columns=["signal", "times", "annotypes", "annotimes", "annots"])
# plot_heartbeat(df["signal"][0], df["times"][0],df["annotypes"][0], df["annotimes"][0], df["annots"][0])
print("Numero record:", len(df))
display(df.head())

In [None]:
# delete heartbeat with more one annotation and with len of signal less 300 and make dataset for neural network
df1 = df[["signal", "annotypes"]]
df1 = df1[df1['annotypes'].str.len() == 1]
df1 = df1[df1['signal'].str.len() == 300]
print("Numero record:", len(df1))
display(df1.head())

In [None]:
# df1.to_csv(base_heartbeats_path + heartbeats_datasets, index=False)

# Addestramento modelli

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.utils import plot_model
from sklearn.model_selection import train_test_split
from d2l import tensorflow as d2l
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder

In [None]:
df2 = df1[(df1['annotypes'] == 'N') | (df1['annotypes'] == 'L')]

In [None]:
X = df2['signal'].tolist()

ann = df2['annotypes'].tolist()
tmp = LabelEncoder().fit_transform(ann)
tmp = tmp.reshape(len(tmp), 1)
y = OneHotEncoder(sparse=False, categories='auto').fit_transform(tmp)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train = np.asarray(X_train)
X_test = np.asarray(X_test)
y_train = np.asarray(y_train)
y_test = np.asarray(y_test)

### Modello 1

In [None]:
model = keras.models.Sequential([
    keras.layers.Conv1D(filters=64, kernel_size=5, strides=1, padding="valid", input_shape=[300, 1]),
    keras.layers.Conv1D(filters=64, kernel_size=5, strides=1, padding="valid"),
    keras.layers.MaxPooling1D(pool_size=2, strides=2),
    keras.layers.Conv1D(filters=128, kernel_size=3, strides=1, padding="valid"),
    keras.layers.Conv1D(filters=128, kernel_size=3, strides=1, padding="valid"),
    keras.layers.MaxPooling1D(pool_size=2, strides=2),
    keras.layers.Flatten(),
    keras.layers.Dense(256, activation=tf.nn.relu),
    keras.layers.Dense(128, activation=tf.nn.relu),
    keras.layers.Dense(2, activation=tf.nn.softmax)
])

model.summary()

In [None]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=3, batch_size=32)

In [None]:
test_error_rate = model.evaluate(X_test, y_test)

### Modello 2

In [None]:
input_tot = Input(shape=(300, 1), name ="Input_tot")

# Pipeline 1
branch1_1 = keras.layers.Conv1D(filters=8, kernel_size=4, activation='relu', name ="branch1_1")(input_tot)
branch1_2 = keras.layers.MaxPooling1D(pool_size=2, strides=2, name ="branch1_2")(branch1_1)
branch1_3 = keras.layers.Conv1D(filters=24, kernel_size=6, activation='relu', name ="branch1_3")(branch1_2)
branch1_4 = keras.layers.MaxPooling1D(pool_size=2, strides=2, name ="branch1_4")(branch1_3)

# Pipeline 2
branch2_1 = keras.layers.Conv1D(filters=8, kernel_size=6, activation='relu', name ="branch2_1")(input_tot)
branch2_2 = keras.layers.MaxPooling1D(pool_size=2, strides=2, name ="branch2_2")(branch2_1)
branch2_3 = keras.layers.Conv1D(filters=24, kernel_size=8, activation='relu', name ="branch2_3")(branch2_2)
branch2_4 = keras.layers.MaxPooling1D(pool_size=2, strides=2, name ="branch2_4")(branch2_3)

# Pipeline 3
branch3_1 = keras.layers.Conv1D(filters=8, kernel_size=8, activation='relu', name ="branch3_1")(input_tot)
branch3_2 = keras.layers.MaxPooling1D(pool_size=2, strides=2, name ="branch3_2")(branch3_1)
branch3_3 = keras.layers.Conv1D(filters=24, kernel_size=10, activation='relu', name ="branch3_3")(branch3_2)
branch3_4 = keras.layers.MaxPooling1D(pool_size=2, strides=2, name ="branch3_4")(branch3_3)

# Merging tre pipeline
branch_concatenate = concatenate([branch1_4,branch2_4,branch3_4], axis=1, name="concatenated_layer")

# Final Layer
dense1 = Dense(256, activation = "sigmoid", name = "dense1")(branch_concatenate)
dense2 = Dense(32, activation = "sigmoid", name = "dense2")(dense1)
output_layer = Dense(4, activation = "sigmoid", name = "output_layer")(dense2)

# Model Definition
model = Model(inputs=[input_tot], outputs=[output_layer])

#Model Details
model.summary()
# keras.utils.plot_model(model, "output/architecture.png", show_shapes=True)

In [None]:
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model.fit(train, epochs=3)

In [None]:
test_error_rate = model.evaluate(X_test, y_test)