<a href="https://colab.research.google.com/github/djsabelo/BiosignalsDeepLearningWorkshop/blob/main/SignalProcessingForSynthesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Signal Processing for  Synthesis

The application of Deep Learning for biosignals synthesis depends on the quality of signal processing and even on the quality of the raw signal.

In this workshop, we will go through the signal processing steps that are usually applied for biosignals synthesis using Python. Here, we will use **numpy** to ease mathematical operations, **matplotlib** to visualize the results of each step, **scipy** for specific processing operations and **os** to search data files.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import os
from scipy.io import loadmat
from scipy.signal import decimate

In [2]:
!git clone https://github.com/djsabelo/BiosignalsDeepLearningWorkshop.git

fatal: destination path 'BiosignalsDeepLearningWorkshop' already exists and is not an empty directory.


## Reading Data

The first step for the application of signal processing is to read the files that contain the data. In this case, we will use ECG files from the Fantasia dataset, that were previously downloaded and, so, can be reached using **os**.

In [3]:
# Find files in folder
folder = './BiosignalsDeepLearningWorkshop/data/'
files = os.listdir(folder)

# Define sampling frequency
fs = 250

print(files)

['f1o04m.mat', 'f1o03m.mat', 'segments.npy', 'f1o02m.mat', 'f1o01m.mat', 'info', 'f1o05m.mat']


Having the list of files we can now read the data contained in them and store them in a list.

However, given that *.mat* files are read as dictionaries, we will simply store the values that correspond to the actual ECG signals. In this case, the key that identifies the signals is *'val'*, thus we index get the values contained in that key.

Moreover, each file contains a respiration signal and an ECG signal, but we will keep only the ECG of each file.

In [4]:
# Iterate over data files and store the data in a list
data = []

Let's check if the data is in the format we expected. In this case, since the sampling frequency is 250 Hz, we will build a time axis using the function **linspace** from **numpy** Python package.

In [5]:
# Get the first signal to plot
signal = data[0]

# Produce the time axis
time = np.arange(0, len(signal)/fs, 1/fs)

# Make a figure
plt.figure()
# Plot the signal in the figure
plt.plot(time, signal)
# Let's see only part of the signal
plt.xlim(500, 508)
# Show plot
plt.show()

IndexError: ignored

In [None]:
# Let's see the second signal
signal = data[1]
time = np.arange(0, len(signal)/fs, 1/fs)

plt.figure()
plt.plot(time, signal)
plt.xlim(500, 508)
plt.show()

## Normalise Data

The first step of signal processing is to normalise the signals. 

In this case, we will remove the mean and divide by the maximum of the absolute of the signal so that the resulting values of the signals will be between -0.5 and 0.5.

Since we have various signals, we will interate over the list that contains them and will store them in a list.

In [None]:
# Start the new list
normalised_data = []

# Iterate over all signals
# Remove Mean of signal
# Calculate absolute value of signal
# Get max value of absolute values of signal
# Divide value from the resulte of the remotion of the mean from the signal

Let's plot the same portions of the same signals. Note that the values of the y-axis changed.

In [None]:
# Let's see the first normalised signal
signal = normalised_data[0]
time = np.arange(0, len(signal)/fs, 1/fs)

plt.figure()
plt.plot(time, signal)
plt.xlim(500, 508)
plt.show()

In [None]:
# Let's see the second normalised signal
signal = normalised_data[1]
time = np.arange(0, len(signal)/fs, 1/fs)

plt.figure()
plt.plot(time, signal)
plt.xlim(500, 508)
plt.show()

## Subsampling

Given that Deep Learning models are "slow" during training, due to the number of iterations and complexity of operations that are required, we will subsample our data, which will save us some time when training our model.

To do this, we will use _decimate_, which is a function from **scipy** which handles this operation maintaing the integrity of the data.

In [None]:
# Build the list to store the subsampled data
subsampled_data = []

# Iterate over the normalised data and subsample using decimate with factor=5

Now that we changed the _virtual_ sampling frequency by subsampling the signal, we can define the new value by dividing the original by the factor of subsampling:

In [None]:
fs /= 5

In [None]:
# Let's see the first subsampled signal
signal = subsampled_data[0]
time = np.arange(0, len(signal)/fs, 1/fs)

plt.figure()
plt.plot(time, signal)
plt.xlim(500, 508)
plt.show()

In [None]:
# Let's see the second subsampled signal
signal = subsampled_data[1]
time = np.arange(0, len(signal)/fs, 1/fs)

plt.figure()
plt.plot(time, signal)
plt.xlim(500, 508)
plt.show()

## Denoise Signals

An important step in most biosignals processing pipelines is to reduce noise in signals. In this sense, we will implement an average filter, which consists on the substitution of each data point by the mean value of its *n* neighbours.

This allows to reduce high frequency noise and to smoothen the signal by choosing a sufficiently large *n*. However, if that value is too large, it will distort the signal and keep only very low frequencies - something we will use to our advantage.

However, there is the problem of the extremes, which does not have neighbours in one of their sides (in the beginning there is not previous values and in the end there is no next values). Thus, our first step will be to build a new array that has the length of the signal plus 2 times the number *n* that will serve as the extremes. The new positions will be filled mirroring of the signal relative to the extremes.

We will use the previously defined signal and *n*=3 for the development.

In [None]:
# Define n
n = 3

# Make an array containing only zeros and with length = length_of_signal + 2*n
extremes_zeros

To make this new array, we will iterate over the subsampled signal and fill the new array according to our description. Thus, the first value of the new array will correspond to the *n*-th value of the subsampled array, the second value will correspond to the *n-1*-th value and so on. When we reach the end of the signal, once we want to mirror the extremes, we fill the next position with the previous value and so on.

In [None]:
# Iterate over the one signal and fill the "extreme_zeros" array appropriately:
  # Mirror the start of the signal
  # Mirror the end of the signal
  # Fill the remaining with the signal

To show the resulting array, we will plot it and show also the subsampled array you previously saw.

In [None]:
# Let's see the auxiliary signal
mirror_time = np.arange(0, len(extremes_zeros)/fs, 1/fs)

# These prints should be "True"
print(all(extremes_zeros[n:-n] == signal))
print(all(extremes_zeros[:n+1] == signal[:n+1][::-1]))
print(all(extremes_zeros[-n-1:] == signal[-n-1:][::-1]))

plt.figure()
plt.plot(mirror_time, extremes_zeros)
plt.plot(time + n/fs, signal)
plt.show()

Now that we have our auxiliary array with all the values we need, let's build the new array and fill each position with the mean value of its *n* neighbours.

In [None]:
# Build the array
smoothen_signal = np.zeros(len(signal))

# Fill the array with the appropriate values

Let's check the result.

In [None]:
# Let's see the smoothened signal
print(smoothen_signal)

plt.figure()
plt.plot(time, smoothen_signal)
plt.xlim(500, 508)
plt.show()

To facilitate our job later on, we will save this part of the code in a reusable function that can be run whenever we want. As arguments, we will have the signal and the value *n* and the function will return the smoothened signal.

In [None]:
def smooth(signal, n):
  pass

We can now iterate over the subsampled signals, apply this function and save the resulting smoothened signals in a new list.

In [None]:
# Build the list where we will save the smoothened signals to
smoothened_data = []

# Iterate over the subsampled data, smoothen the signals and stored them in smoothened_data

## Remove Baseline Wander

The previous step allowed us to remove high-frequency noise from the signal, however, we still need to take care of the low-frequency noise that appears as baseline wander. This kind of noise can be originated by the respiration process which changes the distance between the ECG electrodes when the chest is expanding and contracting.

To reduce this kind of noise we can resource to the smooth function that we previously defined. Namely, if we define a sufficiently large *n* we will keep only the low-frequency components of the signal, which is what we want to remove. Thus, we simply need to subtracte this result from our signal.

Let's see how to do just this.

The first step will be to apply the smoothing function with *n*=40 (a large number) to the previously defined signal.

In [None]:
signal = smoothened_signal

# Calculate the baseline wander using n=40 and the smooth function we defined
baseline_wander

In [None]:
plt.figure()
plt.plot(signal)
plt.plot(baseline_wander)
plt.show()

The next step consists on subtracting this low-frequency signal from the original signal:

In [None]:
# Subtract the baseline_wander signal from the signal
filtered_signal

In [None]:
plt.figure()
plt.plot(signal)
plt.plot(filtered_signal)
plt.plot(baseline_wander)
plt.xlim(75000, 75908)
plt.show()

So, we can apply this to all our signals and save them in a new list.

In [None]:
# Build the list where we will save the denoised signals to
denoised_data = []

# Iterate over the smoothened signals, remove baseline wander and store them in denoised_data

## Remove minimum

The next step is to subtract the minimum value from the signal so that the values are in interval [0, 1], which will be useful for the Deep Learning algorithms.

In [None]:
signal = denoised_signal

# Subtract the minimum value of the signal
interval_signal

Let's plot both signals to see the difference.

In [None]:
plt.figure()
plt.plot(signal)
plt.plot(interval_signal)
plt.xlim(75000, 75908)
plt.show()

Now we will apply the same processing step to all denoised signals and store them in a list for later utilization.

In [None]:
# Build the list where we will save the new signals to
interval_data = []

# Iterate over the filtered signals, apply the same processing and store the results in interval_data

## Quantization

Quantization is an essential step in Deep Learning applications and consists on reducing the dimensionality of our dataset by defining a given range of possible values that the signal can have.

Thus, if we define that the number of possible values is *d*, then we divide the range of values that the signal currently has (between 0 and 1 in our case) by *d* and then, we map each current value to the corresponding new value. 

Let's use the last defined *signal* variable and *d*=1000 to visualise what we mean.

In [None]:
d = 1000
signal = interval_signal

Since the lowest possible value is 0 and the highest possible value is **d** times max(signal), we will first multiply **d** by our signal in order to convert it to the new scale. 

In [None]:
# Scale the signal
scaled_signal

Then, we need to round each resulting value to the closest integer and cast it to **int**.

In [None]:
# Round to integers
rounded_signal

# Cast them to int
quantised_signal

The result depends on the number *d* we choose. For example, for *d*=1000, we will have:

In [None]:
plt.plot(quantised_signal);
plt.xlim(75400, 75908)

Let's try the same procedure with a much smaller *d* to see the differences.

In [None]:
d = 50
# Copy the code above here

In [None]:
plt.plot(quantised_signal);
plt.xlim(75400, 75908)

So, we are reducing the dimensionality of the data and the signal becomes more "squared". This will allow our Deep Learning model to learn the values much easier as there are fewer values to pick from.

However, to keep the shape of the signal, we will use *d* = 1000 and apply this processing step to all our signals.

In [None]:
d = 1000

# Build the list where we will save the new signals to
quantised_data = []

# Iterate over the interval data, quantise each signal and store them

## Segmentation

The last processing step will be the segmentation of our signal. In this case, since our problem is to synthesise signals, we need to use a sliding window segmentation procedure with overlap. Each segment must have at least 2 heartbeats and the size of each segment must have a number 2<sup>n</sup>+1 of data points. Moreover, we will use overlap to increase the size of our dataset.

In [None]:
signal = quantised_signal
size_of_segment = 257 # 2^8 + 1
step = 50

In [None]:
# List where we will store the segments of the signal
segments = []

# Iterate over the signal with a step size corresponding to the overlap
  # Save each segment in segments

Let's see some segments.

In [None]:
for segment in segments[:10]:
  plt.figure()
  plt.plot(segment)
  plt.show()

Finally, our task is to do the same for all signals. Once again, we will build a function that can be invoked whenever we want to improce code readibility. This function will have the signal, the size of segments and the step as arguments and will return the list of segments.

In [None]:
def segmentation(signal, size_of_segment, step):
  pass

Now we can use this function and apply it to all of our signals.

In [None]:
d = 1000

# Build the list where we will save the new signals to
segmented_data = []

# Iterate over the quantised data, segment the signals and save them in segmented_data