In [9]:
import tensorflow as tf # Models ran in venv python 3.9.16 with GPU computing support
from tensorflow import keras
import tensorflow as tf
import numpy as np
from tensorflow.keras.layers import Input, Dense, Reshape, Dropout, Flatten
from tensorflow.keras.layers import BatchNormalization, Activation, UpSampling1D
from tensorflow.keras.layers import Conv1DTranspose, Conv1D, Bidirectional, LSTM, MaxPool1D
from tensorflow.keras.layers import LeakyReLU, MaxPooling1D
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.optimizers import Adam
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import wfdb
import ast
import ecg_plot
import os
import scipy.io as sio
import xgboost as xgb
import glob
import imageio
import numpy as np
import os
import PIL
import time
from IPython import display

In [6]:
sr = 100

In [7]:
with tf.device('/CPU:0'):
    ### Loading raw data into mutable Datframes
    ptb = pd.read_csv('../data/ptbxl_database.csv')
    def load_raw_data(df, sampling_rate, path):
        if(sampling_rate == 100):
            data = [wfdb.rdsamp(path+f) for f in df.filename_lr]
        else:
            data = [wfdb.rdsamp(path+f) for f in df.filename_hr]
        data = np.array([signal for signal, meta in data])
        return data
    
    # load and convert annotation data
    Y = pd.read_csv('../data/ptbxl_database.csv', index_col='ecg_id')
    Y.scp_codes = Y.scp_codes.apply(lambda x: ast.literal_eval(x))

    # Load raw signal data
    X = load_raw_data(Y, sr, '../data/')

    # Load scp_statements.csv for diagnostic aggregation
    agg_df = pd.read_csv('../data/scp_statements.csv', index_col=0)
    agg_df = agg_df[agg_df.diagnostic == 1]

    def aggregate_diagnostic(y_dic):
        tmp = []
        for key in y_dic.keys():
            if key in agg_df.index:
                tmp.append(agg_df.loc[key].diagnostic_class)
        return list(set(tmp))


    # Apply diagnostic superclass
    Y['diagnostic_superclass'] = Y.scp_codes.apply(aggregate_diagnostic)

In [10]:

with tf.device('/GPU:0'):
    # Split data into train and test
    test_fold =10
    val_fold = 9

    def tvt_split(X, Y, val_fold, test_fold, shape = False):
        X_train = X[(Y.strat_fold != test_fold) & (Y.strat_fold != val_fold)]
        y_train = Y[(Y.strat_fold != test_fold) & (Y.strat_fold != val_fold)].age

        y_train = pd.get_dummies(y_train)

        y_train = y_train.idxmax(axis = 1).to_numpy()

        rX_train = X_train[(y_train < 89) & (y_train >= 75)] # Additional filtering of patients older than 89 and younger than 18
        ry_train = y_train[(y_train < 89) & (y_train >= 75)]

        if(shape == True):
            print((X_train.shape, y_train.shape))

        return rX_train, ry_train
    
    X_train, y_train = tvt_split(X, Y, val_fold, test_fold, True)

((17418, 1000, 12), (17418,))


In [11]:
class Discriminator():

    def __init__(self, input_shape):
        self.input_shape = input_shape

    def D_v3(self):
        '''
        url:https://github.com/MikhailMurashov/ecgGAN
        '''

        model = Sequential(name='Discriminator_v3')
        model.add(Conv1D(filters=32, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())

        # model.add(Dropout(0.4))

        model.add(Conv1D(filters=64, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())

        model.add(MaxPool1D(pool_size=2))

        model.add(Conv1D(filters=128, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())

        # model.add(Dropout(0.4))

        model.add(Conv1D(filters=256, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())

        model.add(MaxPool1D(pool_size=2))

        model.add(Flatten())
        model.add(Dense(1))
        
        model.summary()
        
        signal = Input(shape=self.input_shape)
        validity = model(signal)

        return Model(inputs=signal, outputs=validity)

In [12]:
class Generator:
    def __init__(self, latent_size, input_shape):
        self.latent_size = latent_size
        self.input_shape = input_shape

    def G_v5(self):
        '''
        url:https://github.com/MikhailMurashov/ecgGAN
        '''

        model = Sequential(name='Generator_v1')
        model.add(Reshape((self.latent_size, 1)))
        model.add(Bidirectional(LSTM(64, return_sequences=True)))

        model.add(Conv1D(filters=128, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())
    
        model.add(Conv1D(filters=64, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())
        
        model.add(UpSampling1D(2))
        
        model.add(Conv1D(filters=32, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())
        
        model.add(Conv1D(filters=16, kernel_size=16, strides=1, padding='same'))
        model.add(LeakyReLU())

        model.add(Dense(self.input_shape[0]))
        model.add(Activation('tanh'))
        model.add(Reshape(self.input_shape))

        noise = Input(shape=(self.latent_size,))
        signal = model(noise)

        model.summary()

        return Model(inputs=noise, outputs=signal) 