In [None]:
import sys
import os
import glob
import numpy as np
from tqdm import tqdm
import cv2
import matplotlib.pyplot as plt

In [None]:
print(os.getcwd())
def get_records():
    """ Get paths for data in data/mit/ directory """
    #Download if doesn't exist
    
    # There are 3 files for each record
    # *.atr is one of them
    paths = glob.glob('mitbih/*.atr') # returns an array of path names that matches the arguement
    #paths = [os.path.join(os.getcwd(),path) for path in paths]
    # Get rid of the extension
    paths = [path[:-4] for path in paths]
    paths.sort()

    return paths

records = get_records()
print ('There are {} record files'.format(len(records)))
print (records)

In [None]:
def beat_annotations(annotation, type):
    """ Get rid of non-beat markers """
    """'N' for normal beats. Similarly we can give the input 'L' for left bundle branch block beats. 'R' for right bundle branch block
        beats. 'A' for Atrial premature contraction. 'V' for ventricular premature contraction. '/' for paced beat. 'E' for Ventricular
        escape beat."""
    
    good = [type] 
    ids = np.in1d(annotation.symbol, good)

    # We want to know only the positions
    beats = annotation.sample[ids]

    return beats

In [None]:
"""
A  --  Atrial premature beat
E  --  Ventricular escape beat
L  --  Left bundle branch block beat
N or .  --  Normal beat
R  --  Right bundle branch block beat
V  --  Premature ventricular contraction
!  --  Ventricular flutter wave
/  --  Paced beat
"""

import wfdb

def signal_segmentation(sig, type, output_dir=''):
    count = 1
    signals, fields = wfdb.rdsamp(sig, channels = [0])
    ann = wfdb.rdann(sig, 'atr')
    imp_beats = beat_annotations(ann, type)
    beats = (ann.sample)
    for i in tqdm(imp_beats):
        beats = np.array(beats)
        index_i = np.where(beats == i) # find the indexes (location tuples) of all imp_beats(desired annotated beats) inside the array of all beats
        j = index_i[0][0] # as numpy.where returns tuples we only need the first index of item that match
        if(j!=0 and j!=(len(beats)-1)):
            # according to paper
            sig_start = beats[j-1] + 20
            sig_end = beats[j+1] - 20
            data = signals[sig_start:sig_end, 0]
            
            # Plot and save the beat
            fig = plt.figure(dpi=300, frameon=False, figsize=(1.0,0.5))
            plt.plot(data, linewidth=0.5)
            plt.xticks([]), plt.yticks([])
            for spine in plt.gca().spines.values():
                spine.set_visible(False)
            filename = output_dir + 'fig_{}_{}'.format(sig[-3:],count) + '.png'  # sig[-3:] is the last 3 characters (mit-bih file number)
            fig.savefig(filename)
            plt.close()
            im_gray = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
            im_gray = cv2.copyMakeBorder(im_gray,75,75,0,0, cv2.BORDER_REPLICATE) # as the image shape (from plt.savefig) is 300px*150px due to figsize=(1.0,0.5) and dpi=300 where dpi means dots(pixels) per inch
            im_gray = cv2.resize(im_gray, (128, 128), interpolation=cv2.INTER_LANCZOS4)
            cv2.imwrite(filename, im_gray)
            print('img writtten {}'.format(filename))
            count += 1
        print('img completed {}'.format(sig))

In [None]:
# creating database by segmentation of ecg beats into image

labels = ['A', 'L', 'N', '/', 'V', 'R', 'E', '!']
output_dirs = ['APC/', 'LBBB/', 'NOR/', 'PAB/', 'PVC/', 'RBBB/', 'VEB/', 'VFE/']
for type, output_dir in zip(labels, output_dirs):
    result_dir = 'MIT-BIH_DATABASE/'+output_dir
    partial_records = records[42:48]  # 6 elememts out of 48 elements iteration due to RAM shortage, e.g. from records[0] to records[5] -- 6 elememts
    #print(partial_records)
    if not os.path.exists(result_dir):
        os.makedirs(result_dir)
    for r in tqdm(partial_records):
        signal_segmentation(r, type, output_dir=result_dir)

# no need to run this block anymore after data creation(beat segmentation / ECG to beat image conversion)

In [None]:
# count the length of different directory inside dataset and plot pie chart function

def plot_pie_chart_of_data(data, labels, colors, figName_with_ext, figureSize=(10,10), center_white_circle_radius=0.7):
    plt.figure(figsize=figureSize)
    my_circle=plt.Circle((0,0), center_white_circle_radius, color='white')
    plt.pie(data, labels= labels, colors= colors, autopct='%1.1f%%')
    p=plt.gcf()
    p.gca().add_artist(my_circle)
    plt.show()
    p.savefig(figName_with_ext, dpi=400, bbox_inches='tight')

In [None]:
# count the length of different directory inside dataset
Database_DIR = 'MIT-BIH_DATABASE/'
image_dirs = ['APC/', 'LBBB/', 'NOR/', 'PAB/', 'PVC/', 'VEB/', 'RBBB/', 'VFE/']

no_of_files_in_dir=[]
for image_dir in image_dirs:
    path, dirs, files = next(os.walk(os.path.join(Database_DIR,image_dir)))
    no_of_files_in_dir.append(len(files)) 

print('Number of images in each directory={} and total number of images={}'.format(no_of_files_in_dir, sum(no_of_files_in_dir)))

labels = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'VEB', 'RBBB', 'VFE']
colors = ['green','blue','red','skyblue','orange', 'yellow','magenta', 'cyan']

plot_pie_chart_of_data(no_of_files_in_dir,labels,colors, 'data_distribution.png')

# plt.figure(figsize=(10,10))
# my_circle=plt.Circle((0,0), 0.7, color='white')
# plt.pie(no_of_files_in_dir, labels=['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'VEB', 'RBBB', 'VFE'], colors=['green','blue','red','skyblue','orange', 'yellow','magenta', 'cyan'],autopct='%1.1f%%')
# p=plt.gcf()
# p.gca().add_artist(my_circle)
# plt.show()
# p.savefig('data_distribution.png', dpi=400, bbox_inches='tight')

In [None]:
# divide data images into train, test subdirectory
import glob
import random
import shutil

image_dirs = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'RBBB', 'VEB', 'VFE']

if os.path.isdir('MIT-BIH_DATABASE/train/APC') is False:
    for i in image_dirs:
        current_path = 'MIT-BIH_DATABASE/'+i
        path_train = 'MIT-BIH_DATABASE/train/'+i
        path_test = 'MIT-BIH_DATABASE/test/'+i
        os.makedirs(path_train)
        os.makedirs(path_test)
        path, dirs, files = next(os.walk(current_path))
        no_of_files = len(files)
        no_of_test_dir_files = round(no_of_files*0.2)
        no_of_train_dir_files = no_of_files - no_of_test_dir_files
        print(no_of_files)
        for j in random.sample(glob.glob(current_path+'/fig*'),no_of_train_dir_files):
            shutil.move(j,path_train)
        for j in random.sample(glob.glob(current_path+'/fig*'),no_of_test_dir_files):
            shutil.move(j,path_test)
        
        
# moving 80%,20% data from MIT-BIH_DATABASE/ directory to MIT-BIH_DATABASE/train, MIT-BIH_DATABASE/test subdirectory 

# no need to run this block anymore after data folder (train, test) creation

In [None]:
# count the length of datas in train directory inside dataset
Database_DIR = 'MIT-BIH_DATABASE/train/'
image_dirs = ['APC/', 'LBBB/', 'NOR/', 'PAB/', 'PVC/', 'VEB/', 'RBBB/', 'VFE/']
no_of_files_in_dir=[]
for image_dir in image_dirs:
    path, dirs, files = next(os.walk(os.path.join(Database_DIR,image_dir)))
    no_of_files_in_dir.append(len(files)) 

print('Number of images in each directory={} and total number of images={}'.format(no_of_files_in_dir, sum(no_of_files_in_dir)))

labels = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'VEB', 'RBBB', 'VFE']
colors = ['green','blue','red','skyblue','orange', 'yellow','magenta', 'cyan']

plot_pie_chart_of_data(no_of_files_in_dir, labels, colors, 'data_distribution_train_dir.png')

# no need to run this block anymore after data folder (train, test) creation

In [None]:
# count the length of datas in test directory inside dataset
Database_DIR = 'MIT-BIH_DATABASE/test/'
image_dirs = ['APC/', 'LBBB/', 'NOR/', 'PAB/', 'PVC/', 'VEB/', 'RBBB/', 'VFE/']
no_of_files_in_dir=[]
for image_dir in image_dirs:
    path, dirs, files = next(os.walk(os.path.join(Database_DIR,image_dir)))
    no_of_files_in_dir.append(len(files)) 

print('Number of images in each directory={} and total number of images={}'.format(no_of_files_in_dir, sum(no_of_files_in_dir)))

labels = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'VEB', 'RBBB', 'VFE']
colors = ['green','blue','red','skyblue','orange', 'yellow','magenta', 'cyan']

plot_pie_chart_of_data(no_of_files_in_dir, labels, colors, 'data_distribution_test_dir.png')

# no need to run this block anymore after data folder (train, test) creation

In [None]:
# Keras
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Activation, Conv2D, Dense, Dropout, Flatten, MaxPool2D
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import regularizers
from tensorflow.keras.metrics import categorical_crossentropy

from tensorflow.keras.preprocessing.image import ImageDataGenerator

In [None]:
public_devices = tf.config.experimental.list_physical_devices('GPU')
print('Number of GPU available', len(public_devices))

if len(public_devices) > 0:
    for gpu in public_devices:
        tf.config.experimental.set_memory_growth(gpu, True)  # preventing tensorflow to allocate all gpu memory at start of declaration

In [None]:
# data augmentation paper function
def cropping(image, filename):
    
    #Left Top Crop
    crop = image[:96, :96]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'leftTop' + '.png', crop)
    
    #Center Top Crop
    crop = image[:96, 16:112]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'centerTop' + '.png', crop)
    
    #Right Top Crop
    crop = image[:96, 32:]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'rightTop' + '.png', crop)
    
    #Left Center Crop
    crop = image[16:112, :96]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'leftCenter' + '.png', crop)
    
    #Center Center Crop
    crop = image[16:112, 16:112]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'centerCenter' + '.png', crop)
    
    #Right Center Crop
    crop = image[16:112, 32:]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'rightCenter' + '.png', crop)
    
    #Left Bottom Crop
    crop = image[32:, :96]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'leftBottom' + '.png', crop)
    
    #Center Bottom Crop
    crop = image[32:, 16:112]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'centerBottom' + '.png', crop)
    
    #Right Bottom Crop
    crop = image[32:, 32:]
    crop = cv2.resize(crop, (128, 128))
    cv2.imwrite(filename[:-4] + 'rightBottom' + '.png', crop)

# no need to run this block anymore after data augmentation is done

In [None]:
train_path = "MIT-BIH_DATABASE/train"
valid_path = "MIT-BIH_DATABASE/valid"
test_path = "MIT-BIH_DATABASE/test"

In [None]:
# data augmentation
augment_dirs = ['APC/', 'LBBB/', 'PAB/', 'PVC/', 'RBBB/', 'VEB/', 'VFE/']

for image_dir in augment_dirs:
    path, dirs, files = next(os.walk(os.path.join(train_path,image_dir)))
    for file in tqdm(files):
        imagefilepath = os.path.join(train_path,image_dir,file)
        image = cv2.imread(imagefilepath)
        cropping(image, imagefilepath)

# no need to run this block anymore after data augmentation is done

In [None]:
# count the length of different directory inside dataset

image_dirs = ['APC/', 'LBBB/', 'NOR/', 'PAB/', 'PVC/', 'VEB/', 'RBBB/', 'VFE/']
no_of_files_in_dir=[]
for image_dir in image_dirs:
    path, dirs, files = next(os.walk(os.path.join(train_path,image_dir)))
    no_of_files_in_dir.append(len(files)) 

print('Number of images in each directory={} and total number of images={}'.format(no_of_files_in_dir, sum(no_of_files_in_dir)))

labels = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'VEB', 'RBBB', 'VFE']
colors = ['green','blue','red','skyblue','orange', 'yellow','magenta', 'cyan']

plot_pie_chart_of_data(no_of_files_in_dir, labels, colors, 'data_distribution_after_augmentation.png')

# no need to run this block anymore after data folder (train, test) creation

In [None]:
# divide data images into train, test subdirectory
import glob
import random
import shutil

image_dirs = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'RBBB', 'VEB', 'VFE']

if os.path.isdir('MIT-BIH_DATABASE/valid/APC') is False:
    for i in image_dirs:
        current_path = 'MIT-BIH_DATABASE/train/'+i
        path_valid = 'MIT-BIH_DATABASE/valid/'+i
        os.makedirs(path_valid)
        path, dirs, files = next(os.walk(current_path))
        no_of_files = len(files)
        no_of_valid_dir_files = round(no_of_files*0.2)
        print(no_of_files)
        for j in random.sample(glob.glob(current_path+'/fig*'),no_of_valid_dir_files):
            shutil.move(j,path_valid)
        
        
# moving 20% data from MIT-BIH_DATABASE/train/ directory to MIT-BIH_DATABASE/valid subdirectory 

# no need to run this block anymore after validation data folder (valid) creation

In [None]:
# count the length of datas in valid directory inside dataset
Database_DIR = 'MIT-BIH_DATABASE/valid/'
image_dirs = ['APC/', 'LBBB/', 'NOR/', 'PAB/', 'PVC/', 'VEB/', 'RBBB/', 'VFE/']
no_of_files_in_dir=[]
for image_dir in image_dirs:
    path, dirs, files = next(os.walk(os.path.join(Database_DIR,image_dir)))
    no_of_files_in_dir.append(len(files)) 

print('Number of images in each directory={} and total number of images={}'.format(no_of_files_in_dir, sum(no_of_files_in_dir)))

labels = ['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'VEB', 'RBBB', 'VFE']
colors = ['green','blue','red','skyblue','orange', 'yellow','magenta', 'cyan']

plot_pie_chart_of_data(no_of_files_in_dir, labels, colors, 'data_distribution_valid_dir.png')

# no need to run this block anymore after validation data folder (valid) creation

In [None]:
# dataset_size*epoch = number_of_iteration*batch_size

batchSize = 32

train_gen = ImageDataGenerator(rescale=1./255)
valid_gen = ImageDataGenerator(rescale=1./255)
test_gen = ImageDataGenerator(rescale=1./255)

In [None]:
train_batches = train_gen.flow_from_directory(directory=train_path, target_size=(128,128), classes=['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'RBBB', 'VEB', 'VFE'], batch_size=batchSize, seed=7)
valid_batches = train_gen.flow_from_directory(directory=valid_path, target_size=(128,128), classes=['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'RBBB', 'VEB', 'VFE'], batch_size=batchSize, seed=7)
test_batches = test_gen.flow_from_directory(directory=test_path, target_size=(128,128), classes=['APC', 'LBBB', 'NOR', 'PAB', 'PVC', 'RBBB', 'VEB', 'VFE'], batch_size=batchSize, seed=7, shuffle=False)

In [None]:
def plotImages(images_arr, batchSize, subplot_dim=[1,10]):
    fig, axes = plt.subplots(subplot_dim[0], subplot_dim[1], figsize=(20,20))
    axes = axes.flatten()  # flaten converts an array to a 1D vector
    for img, ax in zip(images_arr,axes):
        ax.imshow(img)
        ax.axis('off')
    plt.tight_layout()
    plt.show()

In [None]:
# taking (train) images and labels of only one batch (32 images,32 labels) and plot them

imgs, labels = next(train_batches)
plotImages(imgs, batchSize, [4,8])
print(labels)

In [None]:
# model

def proposed_model(input_h, input_w, nb_classes):
    InputShape = (input_h, input_w, 3)
    
    model = Sequential([
        Conv2D(filters=64, kernel_size=(3,3), activation='elu', kernel_regularizer=regularizers.l1_l2(0.0001, 0.0001) ,padding='same', input_shape=InputShape, kernel_initializer='glorot_uniform'),
        BatchNormalization(),
        Conv2D(filters=64, kernel_size=(3,3), activation='elu', kernel_regularizer=regularizers.l2(0.0001), padding='same'),
        BatchNormalization(),
        MaxPool2D(pool_size=(2, 2), strides= 2),
        Dropout(rate=0.2),
        
        Conv2D(filters=128, kernel_size=(3,3), activation='elu', kernel_regularizer=regularizers.l2(0.0001), padding='same'),
        BatchNormalization(),
        Conv2D(filters=128, kernel_size=(3,3), activation='elu', kernel_regularizer=regularizers.l2(0.0001), padding='same'),
        BatchNormalization(),
        MaxPool2D(pool_size=(2, 2), strides= 2),
        Dropout(rate=0.2),
        
        Conv2D(filters=256, kernel_size=(3,3), activation='elu', kernel_regularizer=regularizers.l2(0.0001), padding='same'),
        BatchNormalization(),
        Conv2D(filters=256, kernel_size=(3,3), activation='elu', kernel_regularizer=regularizers.l2(0.0001), padding='same'),
        BatchNormalization(),
        MaxPool2D(pool_size=(2, 2), strides= 2),
        Dropout(rate=0.2),
        
        Flatten(),
        Dense(units=2048, activation='elu', kernel_regularizer=regularizers.l2(0.0001)),
        BatchNormalization(),
        Dropout(rate=0.5),
        Dense(units=nb_classes, activation='softmax'),
    ])
    
    return model

In [None]:
model = proposed_model(128, 128, 8)
print(model.summary())

In [None]:
lr = 0.001
model.compile(optimizer=Adam(learning_rate= lr), loss='categorical_crossentropy', metrics=['accuracy'])

In [None]:
# train model
Epoch = 2
Verbose = 1

model.fit(x=train_batches, validation_data=valid_batches, epochs=Epoch, verbose=Verbose, shuffle=True)

# save model (architecture, optimizer, weights, ...all)
if os.path.isdir('models') is False:
    os.makedirs('models')
if os.path.isfile('models/ecg_arrgythmia_detection_model.h5') is False:
    model.save('models/ecg_arrgythmia_detection_model.h5')
    print('model saved successfully.')

In [None]:
# load model
from tensorflow.keras.models import load_model

model = load_model('models/ecg_arrgythmia_detection_model.h5')
#prev_saved_model = load_model('models/cnn.h5')

print(model.summary())
# print(prev_saved_model.get_weights())
# print(prev_saved_model.optimizer)

In [None]:
# train model
Epoch = 20
Verbose = 1

model.fit(x=train_batches, validation_data=valid_batches, epochs=Epoch, verbose=Verbose, shuffle=True)
model.save('models/ecg_arrgythmia_detection_model.h5')
print('model saved successfully.')

In [None]:
model.history.history

In [None]:
# load model
from tensorflow.keras.models import load_model

model = load_model('models/ecg_arrgythmia_detection_model_36_epoch.h5')

In [None]:
Epoch = 2
Verbose = 1

model.fit(x=train_batches, validation_data=valid_batches, epochs=Epoch, verbose=Verbose, shuffle=True)
model.save('models/ecg_arrgythmia_detection_model_37_epoch.h5')
print('model saved successfully.')

In [None]:
print(model.history.history.keys())
# summarize history for accuracy
plt.plot(model.history.history['accuracy'])
plt.plot(model.history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()
# summarize history for loss
plt.plot(model.history.history['loss'])
plt.plot(model.history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'validation'], loc='upper left')
plt.show()

In [None]:
# test model

predictions = model.predict(x = test_batches, verbose=1)

In [None]:
# evaluate test results
results = model.evaluate(x = test_batches, verbose=1)

In [None]:
results

In [None]:
test_labels = test_batches.classes
print(test_labels)

In [None]:
rounded_predictions = np.argmax(predictions, axis=-1)

In [None]:
# confusion matrix plot function
from sklearn.metrics import confusion_matrix
import itertools

def plot_confusion_matrix_custom(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    """ prints and plots confusion matrix. 
        normalization can be applied by setting `normalize=True` """
    plt.imshow(cm, interpolation = 'nearest', cmap = cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation = 45)
    plt.yticks(tick_marks, classes)
    
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print('Normalized confusion matrix')
    else:
        print('Confusion matrix, without normalization ')
    
    print(cm)
    
    thresh = cm.max()/2
    for i,j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, "{:0.2f}".format(cm[i, j]), horizontalalignment="center", color="white" if cm[i,j] > thresh else "black")
        
    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.savefig('confusion.jpg', dpi=400, pad_inches=0.1)

In [None]:
# confusion matrix output
cm = confusion_matrix(y_true = test_labels, y_pred = rounded_predictions)
cm_plot_lables = ['NOR', 'LBBB', 'RBBB', 'APC', 'PVC', 'VEB','PAB', 'VFE']
# non normalized confusion matrix
#plot_confusion_matrix_custom(cm = cm, classes = cm_plot_lables)

# normalized confusion matrix
plot_confusion_matrix_custom(cm = cm, classes = cm_plot_lables, normalize = True)

In [None]:
# classification report
from sklearn.metrics import classification_report

classification_report_result = classification_report(test_labels, rounded_predictions, target_names=cm_plot_lables)
print(classification_report_result)