# ECG Generator

That notebook generates a training set for the ECG stream anomaly detection using GAN

## Imports

In [1]:
import csv
import numpy as np
import pandas as pd
import scipy
from sklearn.model_selection import train_test_split
import tensorflow as tf

## Data preparation

In [2]:
dataframe = pd.read_csv('C:/ecg.csv', header=None)
raw_data = dataframe.values

# get last element
labels = raw_data[:, -1]

# rest are data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, random_state=21)

min_val =  tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val)/ (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

## The GAN model
Below there is a definition of GAN model that generates ECG-like data

In [7]:
def sample_generator(dataset, input_dim):
    # Define the dimensions of the data
    data = dataset

    # Define the generator model
    def build_generator():
        model = tf.keras.models.Sequential()
        model.add(tf.keras.layers.Dense(256, input_dim=input_dim))
        model.add(tf.keras.layers.Activation('relu'))
        model.add(tf.keras.layers.Dense(512))
        model.add(tf.keras.layers.Activation('relu'))
        model.add(tf.keras.layers.Dense(input_dim))
        model.add(tf.keras.layers.Activation('sigmoid'))
        return model

    # Define the discriminator model
    def build_discriminator():
        model = tf.keras.models.Sequential()
        model.add(tf.keras.layers.Dense(512, input_dim=input_dim))
        model.add(tf.keras.layers.Activation('relu'))
        model.add(tf.keras.layers.Dense(256))
        model.add(tf.keras.layers.Activation('relu'))
        model.add(tf.keras.layers.Dense(1))
        model.add(tf.keras.layers.Activation('sigmoid'))
        return model

    # Build the GAN model
    def build_gan(generator, discriminator):
        discriminator.trainable = False
        gan_input = tf.keras.layers.Input(shape=(input_dim,))
        generated_data = generator(gan_input)
        gan_output = discriminator(generated_data)
        gan = tf.keras.models.Model(inputs=gan_input, outputs=gan_output)
        gan.compile(loss='binary_crossentropy', optimizer=tf.optimizers.Adam(learning_rate=0.0002, beta_1=0.5))
        return gan

    # Create instances of the generator and discriminator
    generator = build_generator()
    discriminator = build_discriminator()

    generator.compile(loss='binary_crossentropy', optimizer=tf.optimizers.Adam(learning_rate=0.0002, beta_1=0.5))
    discriminator.compile(loss='binary_crossentropy', optimizer=tf.optimizers.Adam(learning_rate=0.0002, beta_1=0.5))


    gan = build_gan(generator, discriminator)

    # Training loop
    batch_size = 32
    epochs = 1000

    for epoch in range(epochs):
        # Generate random noise as input for the generator
        noise = np.random.normal(0, 1, size=(batch_size, input_dim))

        # Generate fake data using the generator
        generated_data = generator.predict(noise)

        # Get the number of data samples in your dataset
        num_samples = data.shape[0]

    # Set the number of randomly chosen data samples you want
        num_random_samples = 10

    # Randomly select indices of the data samples
        random_indices = np.random.choice(num_samples, size=num_random_samples, replace=False)

    # Retrieve the randomly chosen data samples
        random_data = []

        for i in random_indices:
            random_data.append(data[i])

    # Concatenate the real data and generated data
        x = np.concatenate((random_data, generated_data))

        # Create the labels for the discriminator
        y = np.zeros(42)
        y[:batch_size] = 1

        # Train the discriminator
        discriminator_loss = discriminator.train_on_batch(x, y)

        # Generate new random noise as input for the generator
        noise = np.random.normal(0, 1, size=(batch_size, input_dim))

        # Create labels for the generator (tricking the discriminator)
        y = np.ones(batch_size)

        # Train the generator (via the whole GAN model, with the discriminator weights frozen)
        generator_loss = gan.train_on_batch(noise, y)

        # Print the progress
        if epoch % 100 == 0:
            print(f"Epoch {epoch}/{epochs} | Discriminator loss: {discriminator_loss} | Generator loss: {generator_loss}")
    # Generate samples using the trained generator
    return generator

## Creation of generators' models

In [8]:
normal_generator = sample_generator(normal_train_data, 140)
anomaly_generator= sample_generator(anomalous_train_data, 140)

Epoch 0/1000 | Discriminator loss: 0.7096091508865356 | Generator loss: 0.5839999914169312
Epoch 100/1000 | Discriminator loss: 0.5781688094139099 | Generator loss: 0.31573987007141113
Epoch 200/1000 | Discriminator loss: 0.5609455108642578 | Generator loss: 0.22387032210826874
Epoch 300/1000 | Discriminator loss: 0.5453506708145142 | Generator loss: 0.34262362122535706
Epoch 400/1000 | Discriminator loss: 0.5362114906311035 | Generator loss: 0.30845290422439575
Epoch 500/1000 | Discriminator loss: 0.5541907548904419 | Generator loss: 0.28051459789276123
Epoch 600/1000 | Discriminator loss: 0.5459256768226624 | Generator loss: 0.2769061326980591
Epoch 700/1000 | Discriminator loss: 0.5482327342033386 | Generator loss: 0.27564895153045654
Epoch 800/1000 | Discriminator loss: 0.5476164817810059 | Generator loss: 0.2800584137439728
Epoch 900/1000 | Discriminator loss: 0.552757740020752 | Generator loss: 0.27971571683883667
Epoch 0/1000 | Discriminator loss: 0.6863799691200256 | Generator 

## Model exporting

In [10]:
normal_generator.save('models/generators/normal_ecg_generator')
anomaly_generator.save('models/generators/anomalous_ecg_generator')

INFO:tensorflow:Assets written to: models/generators/normal_ecg_generator\assets


INFO:tensorflow:Assets written to: models/generators/normal_ecg_generator\assets


INFO:tensorflow:Assets written to: models/generators/anomalous_ecg_generator\assets


INFO:tensorflow:Assets written to: models/generators/anomalous_ecg_generator\assets


## New data generating

In [11]:
normal_generated_data = normal_generator.predict(np.random.normal(0, 1, size=(12000, 140)))



In [12]:
anomalous_generated_data = anomaly_generator.predict(np.random.normal(0,1, size=(4000, 140)))



## Data postprocessing
The code below filters generated data to make them more 'natural'

In [13]:
def filter_data(generated_data):
    dataset = []

    for sample in generated_data:
        filtered_generated_sample = []
        filtered_generated_sample = np.append(filtered_generated_sample, scipy.signal.wiener(sample, 15))
        dataset.append(filtered_generated_sample)
    return dataset

In [14]:
normal_generated_data = filter_data(normal_generated_data)
anomalous_generated_data = filter_data(anomalous_generated_data)

## Data labeling

In [15]:
new_column = np.ones(12000)
normal_generated_data = np.concatenate((normal_generated_data, new_column[:, np.newaxis]), axis=1)
new_column = np.zeros(4000)
anomalous_generated_data = np.concatenate((anomalous_generated_data, new_column[:, np.newaxis]), axis=1)

## Creation of training dataset

In [16]:
generated_data = np.concatenate((normal_generated_data, anomalous_generated_data), axis=0)
np.random.shuffle(generated_data)

## Training dataset exporting

In [17]:
with open('ecg_dataset.csv', 'w', encoding='UTF-8', newline='') as f:
    writer = csv.writer(f)
    writer.writerows(generated_data)