# Setup

### Intall missing python modules

In [1]:
!pip install wfdb



### Imports

In [2]:
from glob import glob
import os
import numpy as np
import random
import wfdb
import pandas as pd
import tqdm
import cv2
from tensorflow.keras.layers import Dense, Dropout, Conv2D, Input, MaxPool2D, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import ELU
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard, ReduceLROnPlateau, LearningRateScheduler

from keras.utils import np_utils
import math
import matplotlib.pyplot as plt
import tensorflow as tf

Using TensorFlow backend.


# Split Dataset

In [3]:
INITIAL_DS = 'mitdb/'
DS_ROOT_DIR = '/Users/miki/Github/ECG/ECG_MIT_BIH_Arryhthmia_2D_CNN/MIT_DATA/'
OUT_DIRS = ['NOR/', 'LBBB/', 'RBBB/', 'APC/', 'VPC/', 'PE/', 'VEB/', 'VFW']

# Model Definition

In [4]:
def proposed_model(nb_classes=8, input_h=128, input_w=128):
    input_shape = (input_h, input_w, 3)

    inputs = Input(input_shape)

    # layer 1
    x = Conv2D(64, (3, 3), strides=(1, 1), kernel_initializer='glorot_uniform')(inputs)
    x = ELU()(x)
    x = BatchNormalization()(x)

    # layer 2
    x = Conv2D(64, (3, 3), strides=(1, 1), kernel_initializer='glorot_uniform')(x)
    x = ELU()(x)
    x = BatchNormalization()(x)

    # layer3
    x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(x)

    # layer4
    x = Conv2D(128, (3, 3), strides=(1, 1), kernel_initializer='glorot_uniform')(x)
    x = ELU()(x)
    x = BatchNormalization()(x)

    # layer5
    x = Conv2D(128, (3, 3), strides=(1, 1), kernel_initializer='glorot_uniform')(x)
    x = ELU()(x)
    x = BatchNormalization()(x)

    # layer6
    x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(x)

    # layer7
    x = Conv2D(256, (3, 3), strides=(1, 1), kernel_initializer='glorot_uniform')(x)
    x = ELU()(x)
    x = BatchNormalization()(x)

    # layer 8
    x = Conv2D(256, (3, 3), strides=(1, 1), kernel_initializer='glorot_uniform')(x)
    x = ELU()(x)
    x = BatchNormalization()(x)

    # layer 9
    x = MaxPool2D(pool_size=(2, 2), strides=(2, 2))(x)

    x = Flatten()(x)

    # layer 10
    x = Dense(2048)(x)
    x = ELU()(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    x = Dense(nb_classes, activation='softmax')(x)

    model = Model(inputs, x)
    return model

In [5]:
model = proposed_model()
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 128, 128, 3)]     0         
_________________________________________________________________
conv2d (Conv2D)              (None, 126, 126, 64)      1792      
_________________________________________________________________
elu (ELU)                    (None, 126, 126, 64)      0         
_________________________________________________________________
batch_normalization (BatchNo (None, 126, 126, 64)      256       
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 124, 124, 64)      36928     
_________________________________________________________________
elu_1 (ELU)                  (None, 124, 124, 64)      0         
_________________________________________________________________
batch_normalization_1 (Batch (None, 124, 124, 64)      256   

# Train

In [6]:
class Step(Callback):

    def __init__(self, verbose=0):
        self.verbose = verbose
        self.current_epoch = 1

    def change_lr(self, new_lr):
        K.set_value(self.model.optimizer.lr, new_lr)
        if self.verbose == 1:
            print('Learning rate is %g' %new_lr)

    def on_epoch_begin(self, epoch, logs={}):
        self.current_epoch = epoch
    def on_batch_begin(self, batch, logs={}):
        new_lr = 0.0001 * 0.95 ** math.ceil(self.current_epoch*batch/1000)
        self.change_lr(new_lr)



In [16]:
def plot_history(history, result_dir):
    plt.plot(history.history['accuracy'], marker='.')
    plt.plot(history.history['val_accuracy'], marker='.')
    plt.title('model accuracy')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.grid()
    plt.legend(['accuracy', 'val_accuracy'], loc='lower right')
    plt.savefig(os.path.join(result_dir, 'model_accuracy.png'))
    plt.close()
    plt.plot(history.history['loss'], marker='.')
    plt.plot(history.history['val_loss'], marker='.')
    plt.title('model loss')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.grid()
    plt.legend(['loss', 'val_loss'], loc='upper right')
    plt.savefig(os.path.join(result_dir, 'model_loss.png'))
    plt.close()

In [17]:
def save_history(history, result_dir):
    loss = history.history['loss']
    acc = history.history['accuracy']
    val_loss = history.history['val_loss']
    val_acc = history.history['val_accuracy']
    nb_epoch = len(acc)

    with open(os.path.join(result_dir, 'result.txt'), 'w') as fp:
        fp.write('epoch\tloss\tacc\tval_loss\tval_acc\n')
        for i in range(nb_epoch):
            fp.write('{}\t{}\t{}\t{}\t{}\n'.format(
            i, loss[i], acc[i], val_loss[i], val_acc[i]))
        fp.close()

In [9]:
def process_batch(lines,img_path,inputH,inputW,train=True):
    imagew = 192
    imageh = 128
    num = len(lines)
    batch = np.zeros((num, inputH, inputW, 3), dtype='float32')


    labels = np.zeros(num, dtype='int')
    for i in range(num):
        path = lines[i].split(' ')[0]
        label = lines[i].split(' ')[-1]

        label = label.strip('\n')
        label = int(label)

        img = os.path.join(img_path, path)

        if train:
            crop_x = random.randint(0, np.max([0, imagew-inputW]))
            image = cv2.imread(img)

            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = image[:, crop_x:crop_x + inputW, :]
            batch[i] = image
            labels[i] = label
        else:
            image = cv2.imread(img)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = image[:, 32:32+128, :]
            batch[i] = image
            labels[i] = label

    return batch, labels


In [10]:
def generator_train_batch( train_txt, batch_size, num_classes, img_path, inputH, inputW ):
    ff = open(train_txt, 'r')
    lines = ff.readlines()
    num = len(lines)
    while True:
        new_line = []
        index = [n for n in range(num)]
        random.shuffle(index)
        for m in range(num):
            new_line.append(lines[index[m]])

        for i in range(int(num/batch_size)):
            a = i*batch_size
            b = (i+1)*batch_size
            x_train, x_labels = process_batch(new_line[a:b], img_path, inputH, inputW, train=True)
            y = np_utils.to_categorical(np.array(x_labels), num_classes)
            yield x_train, y

In [11]:
def generator_val_batch(val_txt,batch_size,num_classes,img_path,inputH,inputW):
    f = open(val_txt, 'r')
    lines = f.readlines()
    num = len(lines)
    while True:
        new_line = []
        index = [n for n in range(num)]
        random.shuffle(index)
        for m in range(num):
            new_line.append(lines[index[m]])
        for i in range(int(num / batch_size)):
            a = i * batch_size
            b = (i + 1) * batch_size
            y_test,y_labels = process_batch(new_line[a:b],img_path,inputH,inputW,train=False)
            y = np_utils.to_categorical(np.array(y_labels), num_classes)
            yield y_test, y

In [23]:
imageh = 128
imagew = 128

inputH = 128
inputW = 192

outputdir = 'result/'
if os.path.isdir(outputdir):
    print('save in :' + outputdir)
else:
    os.makedirs(outputdir)

train_img_path = './MIT-BIH_AD/'
test_img_path = './MIT-BIH_AD/'
train_file = 'MIT-BIH_AD_train.txt'
test_file = 'MIT-BIH_AD_val.txt'
num_classes = 8

f1 = open(train_file, 'r')
f2 = open(test_file, 'r')
lines=f1.readlines()
f1.close()
train_samples=len(lines)
lines=f2.readlines()
f2.close()
val_samples=len(lines)

batch_size = 12
epochs = 4
input_h = 128
input_w = 128

model = proposed_model(nb_classes=num_classes)

lr = 0.0001
adam = Adam(lr=lr)
model.compile(loss='categorical_crossentropy', optimizer=adam, metrics=['accuracy'])
#model.summary()

callbacks=[
    TensorBoard(log_dir='./logs',histogram_freq=0,write_graph=True,write_grads=False,write_images=True),
    ModelCheckpoint('./result/mit_bih_2D.hdf5',monitor='val_loss',save_best_only=True,save_weights_only=True, verbose=1)
]

history = model.fit_generator(
    generator_train_batch(train_file, batch_size, num_classes, train_img_path, input_h, input_w),
    steps_per_epoch=train_samples // batch_size,
    epochs=epochs,
    callbacks=callbacks,
    validation_data=generator_val_batch(test_file, batch_size, num_classes, test_img_path, input_h, input_w),
    validation_steps=val_samples // batch_size,
    verbose=1
)


save in :result/
Epoch 1/4
   7/4649 [..............................] - ETA: 1:27:20 - loss: 5.3673 - accuracy: 0.3571

KeyboardInterrupt: 

In [18]:
plot_history(history, outputdir)

In [19]:
save_history(history, outputdir)

# Predict

In [20]:
model.load_weights('./result/mit_bih_2D.hdf5', by_name=True)

In [None]:
imageh = 128
imagew = 128

inputH = 128
inputW = 192
class_names = ['NOR','LBBB', 'RBBB', 'APC', 'PVC', 'PAB', 'VEB', 'VFE']

test_file = 'MIT-BIH_AD_test.txt'
test_img_path = './MIT_DATA/'
augmentation = True
output_img = False

f = open(test_file, 'r')
lines = f.readlines()
random.shuffle(lines)
TP = 0
count = 0
total = len(lines)

counter = {'NOR':0,'LBBB': 0, 'RBBB': 0, 'APC': 0, 'PVC': 0, 'PAB': 0, 'VEB': 0, 'VFE': 0}
tp_counter = {'NOR':0,'LBBB': 0, 'RBBB': 0, 'APC': 0, 'PVC': 0, 'PAB': 0, 'VEB': 0, 'VFE': 0}


for line in lines:
    path = line.split(' ')[0]
    label = line.split(' ')[-1]

    label = label.strip('\n')
    answer = int(label)
    img = os.path.join(test_img_path, path)
    image = cv2.imread(img)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    if augmentation:
        Hshmean = int(np.round(np.max([0, np.round((imageh - inputH) / 2)])))
        Wshmean = int(np.round(np.max([0, np.round((imagew - inputW) / 2)])))
        image = image[Hshmean:Hshmean + inputH, Wshmean:Wshmean + inputW, :]
        image = cv2.resize(image, (imagew, imageh))
    else:  
        pass
    
    input_data = np.zeros((1, imagew, imageh, 3), dtype='float32')
    input_data = image.reshape(1,128,128,3)
    pred = model.predict(np.float32(input_data))
    label = np.argmax(pred)
    #TP means for true positive
    if label == answer:
        TP += 1
        tp_counter[class_names[label]] += 1
    count += 1
    counter[class_names[answer]] += 1

print('Total:    Acc = {} '.format(str(TP / count)))
print('LBBB:{}/{}={},\n RBBB:{}/{}={},\n APC:{}/{}={},\n PVC:{}/{}={},\n PAB:{}/{}={},\n VEB:{}/{}={},\n VFE:{}/{}={}'.format(
    tp_counter['LBBB'], counter['LBBB'], (tp_counter['LBBB'] / counter['LBBB']),
    tp_counter['RBBB'], counter['RBBB'], (tp_counter['RBBB'] / counter['RBBB']),
    tp_counter['APC'], counter['APC'], (tp_counter['APC'] / counter['APC']),
    tp_counter['PVC'], counter['PVC'], (tp_counter['PVC'] / counter['PVC']),
    tp_counter['PAB'], counter['PAB'], (tp_counter['PAB'] / counter['PAB']),
    tp_counter['VEB'], counter['VEB'], (tp_counter['VEB'] / counter['VEB']),
    tp_counter['VFE'], counter['VFE'], (tp_counter['VFE'] / counter['VFE'])
))
