In [None]:
https://github.com/nhsx/SynthVAE


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, cohen_kappa_score

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam 
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.layers import Input, Conv1D, Conv2D, MaxPooling2D, BatchNormalization, Reshape, UpSampling2D, Conv2DTranspose, Conv1DTranspose, Dropout, Dense, Flatten

gpus = tf.config.list_physical_devices('GPU')
device = '/device:GPU:0' if gpus else '/device:CPU:0'
print(f"Using device: {device}")

Using device: /device:GPU:0


In [2]:
from tensorflow.keras.layers import Lambda, Layer

def InceptionNucleus(input_layer, filters, one_d_dropout_rate=0):
    branches = []
    for f in filters:
        x = Conv1D(filters=32, kernel_size=f, padding='same', activation='relu')(input_layer)
        x = Dropout(one_d_dropout_rate)(x)
        branches.append(x)
    output = tf.keras.layers.concatenate(branches, axis=-1)
    return output

def InceptionNucleusTranspose(input_layer, filters):
    branches = []
    for f in filters:
        x = Conv1DTranspose(filters=32, kernel_size=f, padding='same', activation='relu')(input_layer)
        branches.append(x)
    output = tf.keras.layers.concatenate(branches, axis=-1)
    return output

def sampling(args):
    """Reparameterization trick by sampling from an isotropic unit Gaussian."""
    z_mean, z_log_var = args
    batch = tf.shape(z_mean)[0]
    dim = tf.shape(z_mean)[1]
    epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
    return z_mean + tf.exp(0.5 * z_log_var) * epsilon

# Custom layer for KL divergence loss
class KLDivergenceLayer(Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
        self.add_loss(kl_loss)
        return inputs


from tensorflow.keras.layers import GaussianNoise

class KLDivergenceLayerWeighted(Layer):
    def __init__(self, kl_weight=0.01, **kwargs):
        self.kl_weight = kl_weight
        super(KLDivergenceLayerWeighted, self).__init__(**kwargs)

    def call(self, inputs):
        z_mean, z_log_var = inputs
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=-1)
        self.add_loss(self.kl_weight * tf.reduce_mean(kl_loss))  # Add KL loss with weight
        return inputs

# Modified VAE creation function to focus on synthesis without manual KL calculation in loss
def create_eeg_vae(one_d_dropout_rate=0, two_d_dropout_rate=0, latent_dim=32, kl_weight=0.01):
    # Encoder
    inputs = Input(shape=(178, 1))

    # Add a Gaussian noise layer to promote variability in the input signal
    x = GaussianNoise(0.05)(inputs)

    # Initial 1D Convolutional Layers with Inception Nucleus
    x = InceptionNucleus(x, filters=[4, 8, 12], one_d_dropout_rate=one_d_dropout_rate)

    # Reshape for 2D Convolutions
    x = Reshape((x.shape[1], x.shape[2], 1))(x)  # (178, 96, 1) for example

    # 2D Convolutional Layers
    x = Conv2D(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    encoded = MaxPooling2D((2, 2))(x)  # Compressed representation (encoder output)

    # Flatten for dense layers in latent space
    x = Flatten()(x)
    x = Dense(128, activation='relu')(x)

    # Latent mean and variance
    z_mean = Dense(latent_dim, name='z_mean')(x)
    z_log_var = Dense(latent_dim, name='z_log_var')(x)

    # KL Divergence layer with weighted loss
    z_mean, z_log_var = KLDivergenceLayerWeighted(kl_weight=kl_weight)([z_mean, z_log_var])

    # Sampling layer
    z = Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

    # Decoder: start from the sampled latent space
    x = Dense(128, activation='relu')(z)
    x = Dense((178 // 2) * (96) * 1, activation='relu')(x)
    x = Reshape((178 // 2, 96, 1))(x)

    # Decoder: reverse the encoder
    x = UpSampling2D((2, 2))(encoded)  # Reverse the max-pooling
    x = Conv2DTranspose(64, (3, 3), padding='same', activation='relu')(x)
    x = BatchNormalization()(x)
    x = Dropout(two_d_dropout_rate)(x)

    # Reshape back to 1D format for InceptionNucleusTranspose
    x = Reshape((178, 96 * 64))(x)  # Flatten the feature maps
    x = Conv1D(96, kernel_size=1, activation='relu')(x)  # Reduce feature maps from 96 * 64 to 96

    # InceptionNucleusTranspose: reverse the multi-scale convolutions
    x = InceptionNucleusTranspose(x, filters=[4, 8, 12])

    # Final reconstruction layer to original input dimensions
    outputs = Conv1D(1, kernel_size=1, padding='same', activation='sigmoid')(x)

    # Variational Autoencoder Model
    vae = Model(inputs, outputs)

    # Compile the model with MSE loss for reconstruction only, as KL loss is handled by the custom layer
    vae.compile(optimizer='adam', loss='mse')

    # Create a separate encoder model (without sampling)
    encoder = Model(inputs, [z_mean, z_log_var, z], name="encoder")

    return vae, encoder

# Create the modified model for synthesis
autoencoder, encoder = create_eeg_vae()
print ("Autoencoder:")
autoencoder.summary()
print ("Encoder:")
encoder.summary()

2024-11-05 14:56:09.920657: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2 Pro
2024-11-05 14:56:09.920686: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2024-11-05 14:56:09.920690: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2024-11-05 14:56:09.920705: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:305] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2024-11-05 14:56:09.920714: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:271] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


Autoencoder:


Encoder:


In [3]:
# Load data

train_data = pd.read_csv('./dataset/hard_test/train.csv', index_col=0)
test_data = pd.read_csv('./dataset/hard_test/test.csv', index_col=0)

X_train = train_data.iloc[:, :-1].values  # X1-X178
X_test = test_data.iloc[:, :-1].values    # X1-X178

# Normalize the data
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Reshape the data for the model
X_train_scaled = X_train_scaled.reshape(-1, 178, 1)
X_test_scaled = X_test_scaled.reshape(-1, 178, 1)

In [4]:
def train_and_test_autoencoder(epochs=3, batch_size=32, one_d_dropout_rate=0, two_d_dropout_rate=0):
    # 1. Load the data
    train_data = pd.read_csv('./dataset/hard_test/train.csv', index_col=0)
    test_data = pd.read_csv('./dataset/hard_test/test.csv', index_col=0)
    
    X_train = train_data.iloc[:, :-1].values  # X1-X178
    X_test = test_data.iloc[:, :-1].values    # X1-X178
    
    # 2. Normalize the data
    scaler = MinMaxScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape the data
    X_train_scaled = X_train_scaled.reshape(-1, 178, 1)
    X_test_scaled = X_test_scaled.reshape(-1, 178, 1)
    
    # Check if GPU is available
    gpus = tf.config.list_physical_devices('GPU')
    device = '/device:GPU:0' if gpus else '/device:CPU:0'
    print(f"Using device: {device}")
    
    with tf.device(device): 
        # 3. Define and compile the autoencoder
        autoencoder, encoder = create_eeg_vae(one_d_dropout_rate=one_d_dropout_rate, two_d_dropout_rate=two_d_dropout_rate)
        # autoencoder.compile(optimizer=Adam(learning_rate=0.0001), loss='mse')
        
        early_stopping = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
        
        # 4. Train the autoencoder
        history = autoencoder.fit(
            X_train_scaled, X_train_scaled,  # Training the autoencoder to reconstruct the input
            epochs=epochs,
            batch_size=batch_size,
            validation_split=0.2,
            callbacks=[early_stopping],
            verbose=1
        )
    
    with tf.device(device): 
        # 5. Evaluate on test data
        test_loss = autoencoder.evaluate(X_test_scaled, X_test_scaled)
        print("Test loss (reconstruction error):", test_loss)
        
        # 6. Use the autoencoder to make predictions (reconstruct the test data)
        X_test_reconstructed = autoencoder.predict(X_test_scaled)
    
    # Bring it back to the original scale
    X_test_reconstructed_unscaled = scaler.inverse_transform(X_test_reconstructed.reshape(-1, 178))
    
    # 7. Plot the training and validation loss over epochs
    plt.figure(figsize=(12, 6))
    
    # Plot training & validation loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    
    # 8. Plot original and reconstructed test samples
    plt.subplot(1, 2, 2)
    plt.plot(X_test[0], label='Original')
    plt.plot(X_test_reconstructed_unscaled[0], label='Reconstructed')
    plt.title('Original vs Reconstructed Test Sample')
    plt.legend()
    
    plt.tight_layout()
    plt.show()
    return autoencoder, encoder

In [None]:
autoencoder, _ = train_and_test_autoencoder(epochs=10, batch_size=32, one_d_dropout_rate=0, two_d_dropout_rate=0)

Using device: /device:GPU:0
Epoch 1/10


2024-11-05 14:56:13.684171: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:117] Plugin optimizer for device_type GPU is enabled.


[1m  1/260[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m3:57:46[0m 55s/step - loss: 0.0218

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Function to plot original vs augmented signals
def plot_original_vs_augmented(X_original, X_augmented, n_augmentations):
    plt.figure(figsize=(12, 8))
    
    for i in range(2):  # Plot for the first 2 samples
        plt.subplot(2, 1, i + 1)
        plt.plot(X_original[i].squeeze(), label='Original Signal', color='blue')  # Plot original signal
        
        # Plot each of the n_augmentations for the current original signal
        for aug_idx in range(n_augmentations):
            plt.plot(X_augmented[i][aug_idx].squeeze(), label=f'Augmented Signal {aug_idx+1}', linestyle='--')
        
        plt.title(f'Original vs Augmented Signal for Sample {i+1}')
        plt.legend()

    plt.tight_layout()
    plt.show()

# Function to manually augment a signal n times
def augment_signal_n_times(original_signal_scaled, autoencoder, n=5, noise_level=0.1):
    augmented_signals = []
    current_signal_scaled = original_signal_scaled

    for _ in range(n):
        # Pass the current signal through the autoencoder to get a reconstruction
        reconstructed_scaled = autoencoder.predict(current_signal_scaled, verbose=0)
        
        # Add noise to the reconstructed signal to create variation in augmentations
        # reconstructed_scaled = reconstructed_scaled + np.random.normal(0, noise_level, reconstructed_scaled.shape)
        
        # Append the augmented signal
        augmented_signals.append(reconstructed_scaled)
        
        # Feed back the perturbed signal for the next iteration
        current_signal_scaled = reconstructed_scaled
    
    # Return the list of augmented signals
    return np.array(augmented_signals)

# Example usage
n_augmentations = 5  # Number of augmentations to generate
first_two_augmented_signals = [[] for _ in range(2)]  # Placeholder for augmented signals for the first two samples

# Augment the first 2 original signals
for sample_idx in range(2):  # For each of the first two samples
    print(f"Augmenting signal for sample {sample_idx}")  # Debugging print
    # Augment each signal 5 times
    augmented_signals = augment_signal_n_times(X_train_scaled[sample_idx:sample_idx+1], autoencoder, n=n_augmentations)
    first_two_augmented_signals[sample_idx] = augmented_signals

# Convert lists into arrays for easier handling
first_two_augmented_signals = [np.array(first_two_augmented_signals[i]) for i in range(2)]

# Check the structure of augmented signals
for idx, augmented_signals in enumerate(first_two_augmented_signals):
    print(f"Sample {idx} has {augmented_signals.shape} augmented signals.")  # Debugging: check structure

# Plot the original and augmented signals for the first two samples
plot_original_vs_augmented(X_train_scaled[:2], first_two_augmented_signals, n_augmentations)
