# Denoising Autoencoder
In this script I try to implement a denoising autoencoder for PPG signals.
Since the data provided for the Applied AI in Biomedicine project course comprise artifacs, I will use an external dataset of PPG signals to train the model. A Denoising Autoencoder is an architecture proposed by Vincent et al. in 2008 aiming to cancel the noise on images, allowing to produce a clean image (https://www.cs.toronto.edu/~larocheh/publications/icml-2008-denoising-autoencoders.pdf). I will try to implement a version of the denoising autoencoder for signals using 1D convolutions over the input vector.
Due to the non-effectiveness of this method in our context, a plain autoencoder will be trained in order to cope with the filtering of the dataset.

## Import Libraries


In [None]:
# General purpose libraries
import numpy as np
import pandas as pd
import csv
import os

# Plotting libraries
import matplotlib.pyplot as plt
import seaborn as sns

# PPG library
!pip install neurokit2
import neurokit2 as nk

# Optimization library
!pip install optuna
import optuna

# Deep learning libraries
import tensorflow as tf
from tensorflow import keras as tfk
from keras import layers as tfkl
from keras import backend as K

## Denoising Autoencoder Class
The following class provides all the methods to deal with the creation and training of a denoising autoencoder.

In [None]:
# Define the class for the autoencoder

class DenoisingAutoencoder:
    """
    @description: built a Denoising Autoencoder using TensorFlow and Keras layers.

    @params:
    - lenght (int): length of the input vector
    - filters (tuple of int): number of filters for the specified convolutional layers of the encoder
    - latentDim (int): size of the latent dimension of the autoencoder

    @return: none
    """
    def __init__(self,length, filters=(32, 64), latentDim=16, lr = 0.001):
      self.length = length
      self.filters = filters
      self.latentDim = latentDim
      self.lr = lr

    def build(self):
        """
        @description: build the autoencoder model. (1) builds the encoder,
                    (2) builts the decoder, (3) builds the model, (4) compile the model

        @params: the object itself

        @return: none
        """

        encoder_inputs = tfkl.Input(shape=(self.length, 1))

        # 1. Create the encoder
        encoder = encoder_inputs

        if type(self.filters) != int:
          for f in self.filters:
            # 1.1. Convolutional layer
            encoder = tfkl.Conv1D(filters = f, kernel_size = 3, padding = 'same',
                                  activation = 'relu', kernel_initializer=tfk.initializers.HeNormal(seed=None))(encoder)
        else:
            encoder = tfkl.Conv1D(filters = self.length, kernel_size = 3,
                                  padding = 'same', activation = 'relu',
                                  kernel_initializer=tfk.initializers.HeNormal(seed=None))(encoder)
        # 2.1. Batch Normalization
        encoder = tfkl.BatchNormalization()(encoder)

        # Flatten the output of the last conv1d layer
        flattened = tfkl.Flatten()(encoder)

        # Create the latent space
        latent_space = tfkl.Dense(units = self.latentDim)(flattened)

        # 2. Create the decoder.
        # The decoder will accept as input the output of the encoder.
        if type(self.filters) != int:
          decoded = tfkl.Dense(self.filters[-1]*(self.length), activation = 'relu')(latent_space)
          # Reshape the decoder
          decoded = tfkl.Reshape((self.length, self.filters[-1]))(decoded)
        else:
          decoded = tfkl.Dense(self.filters*(self.length), activation = 'relu')(latent_space)
          # Reshape the decoder
          decoded = tfkl.Reshape((self.length, self.filters))(decoded)

        # Define the decoder layers
        if type(self.filters) != int:
          for f in reversed(self.filters[:-1]):
              decoded = tfkl.Conv1DTranspose(filters = f, kernel_size = 3, activation='relu', padding = 'same')(decoded)
        else:
              decoded = tfkl.Conv1DTranspose(filters = self.filters, kernel_size = 3, activation='relu', padding = 'same')(decoded)
        # Define the decoder's output
        decoder_output = tfkl.Conv1DTranspose(filters = 1, kernel_size = 3,
                                        activation='linear', padding = 'same')(decoded)

        # Build the model
        model = tfk.models.Model(encoder_inputs, decoder_output, name='denoising_autoencoder')

        # Define the optimizator
        opt = tfk.optimizers.Adam(learning_rate = self.lr)

        # Compile the model
        model.compile(loss='mse', optimizer = opt, )

        self.denoisingAutoencoder = model

        # self.denoisingAutoencoder.summary()


    def train(self, X_train, X_train_noisy, X_val, X_val_noisy, callbacks,
                epochs = 20, batch_size = 64):
        """
        @description: train a denoising autoencoder

        @params:
        - X_train (vector of int): training signals
        - X_train_noisy (vector of int): noisy training signals
        - X_val (vector of int): validation signals
        - X_val_noisy (vector of int): noisy validation signals
        - callbacks (vector of tensorflow's callbacks): the callbacks to be included in the training
        - epochs (int): number of (maximum) training epochs
        - batch_size: (int) batch_size dimension

        @return: none
        """
        if X_val_noisy is not None and X_val is not None:
          # Train monitoring validation loss
          self.history = self.denoisingAutoencoder.fit(
              x=X_train_noisy,
              y=X_train,
              batch_size=batch_size,
              epochs=epochs,
              validation_data=(X_val_noisy, X_val),
              callbacks=callbacks
          ).history

          # Plot the training history
          self._plotHistory(epochs)
        else:
          # Train without monitoring validation loss
          self.history = self.denoisingAutoencoder.fit(
              x=X_train_noisy,
              y=X_train,
              batch_size=batch_size,
              epochs=epochs
          ).history



        # Save the model
        self.denoisingAutoencoder.save("DenoisingAutoencoder")

        return self.denoisingAutoencoder

    def _plotHistory(self, epochs):
        """
        @description: plot the history of the training of a denoising autoencoder

        @params:
        - epochs (int): number of epochs

        @return: none
        """

        history = self.history

        best_epoch = np.argmin(history['val_loss'])
        plt.figure(figsize=(17,4))
        plt.plot(history['loss'][1:], label='Training loss', alpha=.8, color='#ff7f0e')
        plt.plot(history['val_loss'][1:], label='Validation loss', alpha=.9, color='#5a9aa5')
        plt.axvline(x=best_epoch, label='Best epoch', alpha=.3, ls='--', color='#5a9aa5')
        plt.title('Categorical Crossentropy')
        plt.legend()
        plt.grid(alpha=.3)
        plt.show()

    def setLearningRate(self, lr):
        self.lr = lr


## Data Generation

In [None]:
# Generate 100 PPG signals
signals = []
for i in range(200):
  bpm = np.random.randint(65, 80)
  fs = 250
  ppg = nk.ppg_simulate(duration=200, sampling_rate=fs, heart_rate=bpm,
                        ibi_randomness = 0.01, frequency_modulation = 0.1,
                        drift = 0.1, motion_amplitude = 0.1,)
  signals.append({'ppg': ppg, 'bpm': bpm})


In [None]:
# Plot one signal
idx = 18
sample_frequency = 128 # Hz
time_values = np.arange(len(signals[len(signals) - 1]['ppg'])) / sample_frequency
# Plot the signal
plt.plot(time_values, signals[idx]['ppg'])
plt.xlabel('Time (seconds)')
plt.ylabel('Signal Amplitude')
plt.title('Signal Plot')
plt.grid(True)
plt.xlim(0, 25)
plt.show()

In [None]:
# Generate the dataset to be feed to the autoencoder

# Define window and stride
window = 200
stride = 50

dataset = []

for ppg in signals:
    # Take only meaningful features
    temp = ppg['ppg']

    # Standardize the signal
    mean = np.mean(temp)
    std_dev = np.std(temp)
    standardized_temp = (temp - mean) / std_dev

    # Compute padding length
    padding_len = window - len(temp)%window
    # Create padding and concatenate it
    padding = np.zeros(padding_len, dtype='float32')
    temp = np.concatenate((temp, padding))
    # Build features windows with their corresponging labels
    idx = 0
    while idx+window <= len(temp):
        dataset.append(temp[idx:idx+window])
        idx += stride
dataset = np.array(dataset) # composed of chunks and labels

dataset.shape


In [None]:
# Perform the train-validation-test split
X_train = dataset[:80000]
X_val = dataset[80000:95000]
X_test = dataset[95000:len(dataset)]

print(X_train.shape)
print(X_val.shape)
print(X_test.shape)


In [None]:
# Generate the noisy samples
trainNoise = np.random.normal(loc=np.mean(X_train), scale= 0.1, size=X_train.shape)
valNoise = np.random.normal(loc=np.mean(X_val), scale= 0.1, size=X_val.shape)
testNoise = np.random.normal(loc=np.mean(X_test), scale= 0.1, size=X_test.shape)

X_train_noisy = np.clip(X_train + trainNoise, np.min(X_train + trainNoise), np.max(X_train + trainNoise))
X_val_noisy = np.clip(X_val + valNoise, np.min(X_val + valNoise), np.max(X_val + valNoise))
X_test_noisy = np.clip(X_test + testNoise, np.min(X_test + testNoise), np.max(X_test + testNoise))

# Set some noisy samples identical to the plain samples, in such a way that the model
# does not learn only to modify the signal
i = 0
rnd_train = []
while(i < int(X_train_noisy.shape[0]* 0.4)):
  rnd = np.random.randint(0,X_train_noisy.shape[0])
  rnd_train.append(rnd)
  X_train_noisy[rnd] = X_train[rnd]
  i += 1

i = 0
rnd_val = []
while(i < int(X_val_noisy.shape[0]* 0.4)):
  rnd = np.random.randint(0,X_val_noisy.shape[0])
  rnd_val.append(rnd)
  X_val_noisy[rnd] = X_val[rnd]
  i += 1

i = 0
rnd_test = []
while(i < int(X_test_noisy.shape[0]* 0.4)):
  rnd = np.random.randint(0,X_test_noisy.shape[0])
  rnd_test.append(rnd)
  X_test_noisy[rnd] = X_test[rnd]
  i += 1

# Add some artifacts to some of the samples (excluded the previous ones)
i = 0
while(i < int(X_train_noisy.shape[0]* 0.3)):
  rnd = np.random.randint(0,X_train_noisy.shape[0])
  while rnd in rnd_train:
    rnd = np.random.randint(0,X_train_noisy.shape[0])

  num_spikes = np.random.randint(100, 200)
  spike_indices = np.random.randint(0, len(X_train_noisy[rnd]), num_spikes)
  spike_magnitudes = np.random.randint(-50, 50, num_spikes)
  for idx, mag in zip(spike_indices, spike_magnitudes):
    X_train_noisy[rnd][idx] += mag
  i += 1

i = 0
while(i < int(X_val_noisy.shape[0]* 0.3)):
  rnd = np.random.randint(0,X_val_noisy.shape[0])
  while rnd in rnd_val:
    rnd = np.random.randint(0,X_val_noisy.shape[0])

  num_spikes = np.random.randint(100, 200)
  spike_indices = np.random.randint(0, len(X_val_noisy[rnd]), num_spikes)
  spike_magnitudes = np.random.randint(-50, 50, num_spikes)
  for idx, mag in zip(spike_indices, spike_magnitudes):
    X_val_noisy[rnd][idx] += mag
  i += 1


i = 0
while(i < int(X_test_noisy.shape[0]* 0.3)):
  rnd = np.random.randint(0,X_test_noisy.shape[0])
  while rnd in rnd_test:
    rnd = np.random.randint(0,X_test_noisy.shape[0])

  num_spikes = np.random.randint(100, 200)
  spike_indices = np.random.randint(0, len(X_test_noisy[rnd]), num_spikes)
  spike_magnitudes = np.random.randint(-50, 50, num_spikes)
  for idx, mag in zip(spike_indices, spike_magnitudes):
    X_test_noisy[rnd][idx] += mag
  i += 1




In [None]:
idx = rnd
# Plot one signal
sample_frequency = 128 # Hz
time_values = np.arange(len(X_test[idx]))
# Plot the signal
plt.plot(time_values, X_test[idx])
plt.xlabel('Time (seconds)')
plt.ylabel('Signal Amplitude')
plt.title('Signal Plot')
plt.grid(True)
plt.xlim(30, len(X_test[idx]))
plt.show()

# Plot one signal
sample_frequency = 128 # Hz
time_values = np.arange(len(X_test_noisy[idx]))
# Plot the signal
plt.plot(time_values, X_test_noisy[idx])
plt.xlabel('Time (seconds)')
plt.ylabel('Signal Amplitude')
plt.title('Signal Plot')
plt.grid(True)
plt.xlim(30, len(X_test_noisy[idx]))
plt.show()

In [None]:
# Set the correct shapes
X_train = np.expand_dims(X_train, axis = -1)
X_val = np.expand_dims(X_val, axis = -1)
X_test = np.expand_dims(X_test, axis = -1)


X_train_noisy = np.expand_dims(X_train_noisy, axis = -1)
X_val_noisy = np.expand_dims(X_val_noisy, axis = -1)
X_test_noisy = np.expand_dims(X_test_noisy, axis = -1)

print("Train shapes: ", X_train.shape, X_train_noisy.shape)
print("Validation shapes: ", X_val.shape, X_val_noisy.shape)
print("Test shapes: ", X_test.shape, X_test_noisy.shape)

In [None]:
X_train = np.expand_dims(X_train, axis = -1)
X_val = np.expand_dims(X_val, axis = -1)
X_test = np.expand_dims(X_test, axis = -1)

print("Train shapes: ", X_train.shape)
print("Validation shapes: ", X_val.shape)
print("Test shapes: ", X_test.shape)

## Train the Model

In [None]:
# Define a denoising autoencoder instance
input_shape = X_train.shape
denoising_autoencoder = DenoisingAutoencoder(length = input_shape[1], filters=(8, 16), lr = 1e-3)

# Build the model
denoising_autoencoder.build()

In [None]:
callbacks = [
        tfk.callbacks.EarlyStopping(monitor='val_loss', mode='max', patience=15, restore_best_weights=True),
        tfk.callbacks.ReduceLROnPlateau(monitor='val_loss', mode='max', patience=5, factor=0.5)
]
# Train the model
denoising_autoencoder.train(X_train = X_train, X_train_noisy = X_train_noisy, X_val = X_val, X_val_noisy=X_val_noisy, callbacks=callbacks)


In [None]:
del denoising_autoencoder

## Hyperparameter tuning of the Denoising Autoencoder

In [None]:
callbacks = [
             tfk.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
             tfk.callbacks.ReduceLROnPlateau(monitor='val_loss', patience=5, factor=0.5)
            ]

# Objective function
def objective_function(optuna_trial):

    denoisingAutoencoder = DenoisingAutoencoder(
                                                length = X_train.shape[1],
                                                lr = 0.01,
                                                filters=optuna_trial.suggest_categorical('filters', [(256, 128, 64, 32), (128, 64, 32), (64, 32), (64, 32, 16), (32, 18)]),
                                                latentDim=optuna_trial.suggest_categorical('latentDim', [128, 64, 32, 16, 8])
                                                )
    denoisingAutoencoder.build()

    # Train the model
    model = denoisingAutoencoder.train(X_train = X_train,
                                       X_train_noisy = X_train,
                                       X_val = X_val,
                                       X_val_noisy = X_val,
                                       callbacks = callbacks,
                                       batch_size=optuna_trial.suggest_categorical("batch_size", [256, 128, 64, 32, 16])
                                       )

    predictions = model.predict(X_val)
    mse = np.mean(np.square(predictions - X_val))

    # delete the model and the DenoisingAutoencoder object
    del model
    del denoisingAutoencoder

    return mse


In [None]:

# Define the optimization study
optuna_study = optuna.create_study(direction="minimize")

optuna_study.optimize(objective_function, n_trials = 20)

In [None]:
optuna_study.best_trial.params

## Finetune the Model with the Best Parameters

In [None]:
# {'filters': (64, 32), 'latentDim': 64, 'batch_size': 128}
input_shape = X_train.shape
callbacks = [
             tfk.callbacks.EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True),
             tfk.callbacks.ReduceLROnPlateau(monitor='val_loss', patience=5, factor=0.5)
            ]

# Define a denoising autoencoder instance
denoising_autoencoder = DenoisingAutoencoder(length = input_shape[1], lr = 0.002581221789660623, filters = (64, 32), latentDim = 64)
denoising_autoencoder.build()
denoising_autoencoder.train(X_train = X_train, X_train_noisy = X_train, X_val = X_val, X_val_noisy=X_val, callbacks=callbacks, batch_size=128, epochs = 50)

In [None]:
# Finetune the model with a low learning rate
denoising_autoencoder.setLearningRate(lr=1e-5)
denoising_autoencoder.train(X_train = X_train, X_train_noisy = X_train, X_val = X_val, X_val_noisy=X_val, callbacks=callbacks, batch_size=16, epochs = 25)

In [None]:
# Retrain the model on the whole dataset (with a small learning rate)
denoising_autoencoder.setLearningRate(lr=1e-5)
denoising_autoencoder.train(X_train = np.concatenate((X_train, X_val), axis=0),
                            X_train_noisy = np.concatenate((X_train, X_val), axis=0),
                            X_val = None, X_val_noisy=None, callbacks=None,
                            batch_size=128, epochs = 15)

In [None]:
del denoising_autoencoder

## Test the Denoising Autoencoder

In [None]:
# Load the model
model = tfk.models.load_model('DenoisingAutoencoder')

In [None]:
# Remove the noise from the test set
denoised_data = model.predict(X_test)

In [None]:
# Define the sampling frequency and time values based on the length of the signal
sample_frequency = 128  # Hz
time_values = np.arange(len(X_test[1])) / sample_frequency

# Rest of the code remains the same for plotting...

# Create a single plot with subplots for each signal
fig, axs = plt.subplots(2, 1, figsize=(8, 8))
index = 4
# Real Signal
axs[0].plot(time_values, X_test[index])
axs[0].set_xlabel('Time (seconds)')
axs[0].set_ylabel('Signal Amplitude')
axs[0].set_title('Real Signal')
axs[0].grid(True)
axs[0].set_xlim(30 / sample_frequency, len(X_test[index]) / sample_frequency)

# Noisy Signal
"""
axs[1].plot(time_values, X_test_noisy[index])
axs[1].set_xlabel('Time (seconds)')
axs[1].set_ylabel('Signal Amplitude')
axs[1].set_title('Noisy Signal')
axs[1].grid(True)
axs[1].set_xlim(30 / sample_frequency, len(X_test_noisy[index]) / sample_frequency)
"""
# Denoised Signal
axs[1].plot(time_values, denoised_data[index])
axs[1].set_xlabel('Time (seconds)')
axs[1].set_ylabel('Signal Amplitude')
axs[1].set_title('Denoised Signal')
axs[1].grid(True)
axs[1].set_xlim(30 / sample_frequency, len(denoised_data[index]) / sample_frequency)

# Adjust layout and save the whole figure as an image
plt.tight_layout()
# plt.savefig('combined_signals.png')  # Save as an image file
plt.show()
