In [None]:
from keras.layers import *
from keras.models import Model
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop
from keras.activations import relu
from keras import backend
from tqdm import tqdm
import numpy as np
import scipy.stats as sc
import matplotlib.pyplot as plt
from google.colab import drive
drive.mount("/content/gdrive")

In [None]:
optimiser = RMSprop(0.00005)
ecg_shape = (400,1)
n_critic = 7
clip_value = 0.02


def wasserstein_loss(y_true, y_pred):
	return backend.mean(y_true * y_pred)
 

def create_critic():
  critic = Sequential()
  Input_shape = (400,1)
  critic.add(Reshape((400,1),input_shape = (ecg_shape)))
  # Conv 1
  critic.add(Conv1D(64, kernel_size=5, strides=1,input_shape=(ecg_shape)))
  critic.add(MaxPooling1D(pool_size = 5, strides = 1))
  critic.add(LeakyReLU(0.2))

  # Conv 2
  critic.add(Conv1D(128, kernel_size=5, strides=1))
  critic.add(MaxPooling1D(pool_size = 5, strides = 1))
  #critic.add(BatchNormalization(momentum=0.9))
  critic.add(LeakyReLU(0.2))

  # Conv 3
  critic.add(Conv1D(256, kernel_size=5, strides=1))
  critic.add(MaxPooling1D(pool_size = 5, strides = 2))
  #critic.add(BatchNormalization(momentum=0.9))
  critic.add(LeakyReLU(0.2))

  # Conv 4
  critic.add(Conv1D(512, kernel_size=5, strides=1))
  critic.add(MaxPooling1D(pool_size = 5, strides = 2))
  #critic.add(BatchNormalization(momentum=0.9))
  critic.add(LeakyReLU(0.2))

  critic.add(Flatten())
  
  critic.add(Dense(units=1, activation='linear'))

  critic.compile(loss=wasserstein_loss, metrics=['accuracy'], optimizer=optimiser)
  critic.summary()

  return critic

def create_generator():

  generator = Sequential()

  Input_shape = (400,1)

  generator.add(Bidirectional(LSTM(128, return_sequences=True), input_shape = Input_shape))
  generator.add(Dropout(0.1))
  generator.add(Bidirectional(LSTM(128, return_sequences=True)))
  generator.add(Dropout(0.1))

   
  generator.add(Flatten())
  generator.add(Dense(ecg_shape[0], activation='tanh'))

  #generator.compile(loss=wasserstein_loss, optimizer=optimiser)
  generator.summary()

  return generator


def create_GAN(critic, generator):
  critic.trainable = False

  GAN = Sequential()
  GAN.add(Input(shape=(400,1)))
  GAN.add(generator)
  GAN.add(critic)

  GAN.compile(loss=wasserstein_loss, optimizer=optimiser)
  GAN.summary()
  return GAN

In [None]:
def show_results(generator,norm_value):
  noise = tf.random.normal([400,1])
  print(noise.shape)
  noise1 = tf.expand_dims(noise, axis=0)
  print(noise1.shape)
  ecgs = generator.predict(noise1)
  denorm_ecgs = ecgs * norm_value

  #plt.plot(noise)
  plt.plot(denorm_ecgs[0])
  plt.grid()
  plt.xlabel('Time step')
  plt.ylabel('Amplitude(mV)')
  plt.xlim((0,400))
  plt.show()


In [None]:
import tensorflow as tf

def compute_kernel(x, y):
    x_size = tf.shape(x)[0]
    y_size = tf.shape(y)[0]
    dim = tf.shape(x)[1]
    tiled_x = tf.tile(tf.reshape(x, tf.stack([x_size, 1, dim])), tf.stack([1, y_size, 1]))
    tiled_y = tf.tile(tf.reshape(y, tf.stack([1, y_size, dim])), tf.stack([x_size, 1, 1]))
    return tf.exp(-tf.reduce_mean(tf.square(tiled_x - tiled_y), axis=2) / tf.cast(dim, tf.float64))

def compute_mmd(x, y, sigma_sqr=1.0):
    x_kernel = compute_kernel(x, x)
    y_kernel = compute_kernel(y, y)
    xy_kernel = compute_kernel(x, y)
    return tf.reduce_mean(x_kernel) + tf.reduce_mean(y_kernel) - 2 * tf.reduce_mean(xy_kernel)

In [None]:

def mmd_loss():

  data_1 = np.load('/content/gdrive/My Drive/4YP/Data/ECG_Real_2.npy')
  data_2 = np.load('/content/gdrive/My Drive/4YP/Data/ECG_Real_2_Test.npy')
  data_3 = np.load('/content/gdrive/My Drive/4YP/Data/ECG_Real_2_Extra.npy')
  real_data_set = np.concatenate([data_1,data_2,data_3],axis =1)
  real_data = real_data_set[:,2000:2800]

  #print(real_data.shape)

  fake_data = np.zeros((400,800))
  data = np.zeros(400)
  for i in range(800):
    noise = tf.random.normal([400,1])
    #print(noise.shape)
    noise1 = tf.expand_dims(noise, axis=0)
    #print(noise1.shape)
    ecgs = generator.predict(noise1)
    denorm_ecgs = ecgs * norm_value

    for j in range(400):
      data[j] = denorm_ecgs[0,j]
    fake_data[:,i] = data


  real_data = np.transpose(real_data)
  #print(real_data.shape)
  fake_data = np.transpose(fake_data)
  #print(fake_data.shape)

  mmd_loss = compute_mmd(real_data,fake_data)
  print(mmd_loss)

  return mmd_loss

In [None]:
import tensorflow as tf

batch_size = 64
epochs = 5000
critic = create_critic()
generator = create_generator()
GAN = create_GAN(critic, generator)

data_1 = np.load('/content/gdrive/My Drive/4YP/Data/ECG_Real_2.npy')
data_2 = np.load('/content/gdrive/My Drive/4YP/Data/ECG_Real_2_Test.npy')
data_3 = np.load('/content/gdrive/My Drive/4YP/Data/ECG_Real_2_Extra.npy')
data = np.concatenate([data_1,data_2,data_3],axis =1)
data1 = data[:,:2000]
data2 = data[:,3000:]
data = np.concatenate([data1,data2],axis =1)
maxim = np.zeros(len(data))
minim = np.zeros(len(data))
#scale = np.zeros(2*len(data))

for I in range(len(data)):
  maxim[I] = max(data[I])
  minim[I] = -min(data[I])
#print(max(maxim))
#print(max(minim))
scale = np.concatenate([maxim,minim])
norm_value = max(scale)
data = data / norm_value
noise_array =  np.array(tf.random.normal([1000, 400, 1]))
#rint(noise_array.shape)

d_losses, d_accuracy, g_losses = [], [], []

mmd_losses = np.zeros(50)

for epoch in tqdm(range(epochs)):

  for _ in range(n_critic):
    
    idx = np.random.randint(0, high = 954 , size= batch_size)
    real_ecgs = data[:,idx]
    real_ecgs = np.transpose(real_ecgs)
    real_ecgs = tf.expand_dims(real_ecgs, axis=2)

    idx = np.random.randint(0,high= 1000 ,size=batch_size)
    noise = noise_array[idx,:,:]

    fake_ecgs = generator.predict(noise,batch_size=batch_size)
    fake_ecgs = tf.expand_dims(fake_ecgs, axis=2)

    X_real = real_ecgs
    X_fake = fake_ecgs

    y_real = -np.ones(batch_size) 
    y_fake = np.ones(batch_size) 

    critic.trainable = True
    c_loss_real = critic.train_on_batch(X_real ,y_real)
    c_loss_fake = critic.train_on_batch(X_fake ,y_fake)
    critic.trainable = False

    for l in critic.layers:
        weights = l.get_weights()
        weights = [np.clip(w, -clip_value, clip_value) for w in weights]
        l.set_weights(weights)

  y = - np.ones(batch_size)
  g_loss = GAN.train_on_batch(noise, y)


  #c_losses.append(c_loss[0])
  #c_accuracy.append(c_loss[1])
  g_losses.append(g_loss)


  if epoch%100 == 0:
    show_results(generator,norm_value)  
    show_results(generator,norm_value)
    show_results(generator,norm_value)
    show_results(generator,norm_value)
    mmd_loss()


plt.figure()
plt.ylabel('Loss Value')
plt.xlabel('Epochs')
plt.xlim((0,5000))
plt.plot(d_losses, 'r', label='disc_loss')
plt.plot(g_losses, 'b', label='gen_loss')
plt.grid()
plt.show()

plt.plot(d_accuracy)
plt.ylabel('Accuracy')
plt.xlabel('Epochs')
plt.grid()
plt.show()

show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)

In [None]:
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)
show_results(generator,norm_value)

In [None]:
mmd_loss()