# Imports

In [2]:
import biosignalsnotebooks as bsnb

# Scientific programming package.
from numpy import average, array, reshape, sqrt, sort, diff, where, argmax, max
from numbers import Number

# Pacakge dedicated to Wavelet decomposition algorithms.
from pywt import swt, iswt

# Machine-learning dedicated package.
from sklearn.mixture import GaussianMixture

# Gaussian Distribution function.
from scipy.stats import norm

# Built-in packages.
from copy import deepcopy

# Loading Signals

In [3]:
# Load entire acquisition data.
data, header = bsnb.load("../archive/data/staticopensignalsdata.txt", get_header=True)

# Store the desired channel (CH3) in the "signal" variable
signal = data["CH5"]

# Sampling rate definition.
sr = header["sampling rate"]

# Raw to uS sample value conversion.
signal_us = bsnb.raw_to_phy("EDA", "bioplux", signal, 12, "uS")

# Filtering Stage 1 (Smoothing the EDA-Signal)

In [4]:
# Conventional 1st order Butterworth bandpass filtering 
# (due to the restrictive bandwidth in order to guarantee system stability it is necessary to reduce the filter order)
# [Option RES]
signal_res = bsnb.bandpass(signal_us, 0.045, 0.25, order=1, fs=sr, use_filtfilt=True)

# Conventional 2nd order Butterworth lowpass filtering 
# [Option EXT]
signal_ext = bsnb.lowpass(signal_us, 35, order=2, fs=sr, use_filtfilt=True)

# Filtering Stage 2 (Minimizing Influence of Motion Artifacts)

### Decomposition of signal_ext using Stationary Wavelet Transform (SWT)

In [5]:
# SWT 8th level Decomposition using "Haar" mother wavelet (taking into consideration its ability to detect "edges" in the signal) .
swt_orig_coeffs = swt(signal_ext[:32768], "haar", level=8)

# Restriction of filtering algorithm to the coefficients of the last decomposition level.
detail_coeffs = swt_orig_coeffs[0][1]
scaling_coeffs = swt_orig_coeffs[0][0]

### Generation of a Gaussian Mixture Model

In [6]:
gaussian_mixt = GaussianMixture(n_components=2, covariance_type="spherical") # "spherical" covariance type ensures that each Gaussian components has its own single variance.

# Reshape data to a column vector format.
detail_coeffs_col = reshape(detail_coeffs, (len(detail_coeffs), 1))

# Fit data to our model object.
gaussian_mixt.fit(detail_coeffs_col)

### Determination of the Cumulative Density Function of the previously defined Gaussian Mixture

In [7]:
# Normal distribution function objects.
norm_1 = norm(loc=gaussian_mixt.means_[0][0], scale=sqrt(gaussian_mixt.covariances_[0])) 
norm_2 = norm(loc=gaussian_mixt.means_[1][0], scale=sqrt(gaussian_mixt.covariances_[1])) 

# Component weights.
weight_1 = gaussian_mixt.weights_[0]
weight_2 = gaussian_mixt.weights_[1]

# CDF values for the coefficients under analysis.
sort_detail_coeffs = sort(detail_coeffs)
norm_1_cdf = norm_1.cdf(sort_detail_coeffs)
norm_2_cdf = norm_2.cdf(sort_detail_coeffs)

# CDF of the Gaussian mixture.
cdf_mixt = weight_1 * norm_1_cdf + weight_2 * norm_2_cdf

### Definition of Motion Artifact Removal Thresholds using the CDF of the previously defined Gaussian Mixture Model, considering an Artifact Proportion value equal to 0.01

In [8]:
art_prop = 0.01 # Artifact proportion value.
low_thr = None 
high_thr = None

# Check when the CDF mixture function reaches values art_prop / 2 and 1 - art_prop / 2.
for i in range(0, len(norm_1_cdf)):
    # Low threshold clause.
    if cdf_mixt[i] - cdf_mixt[0] >= art_prop and low_thr == None:
        low_thr = sort_detail_coeffs[i]
        
    # High threshold clause.
    if cdf_mixt[-1] - cdf_mixt[i] <= art_prop and high_thr == None:
        high_thr = sort_detail_coeffs[i]

### Removal of Wavelet Coefficients related with Motion Artifacts

In [9]:
filt_detail_coeffs = deepcopy(detail_coeffs)
count_1 = 0
count_2 = 0
for j in range(0, len(filt_detail_coeffs)):
    if detail_coeffs[j] <= low_thr or detail_coeffs[j] >= high_thr:
        filt_detail_coeffs[j] = 0
    else:
        continue
        
# Update of the SWT decomposition tupple.
swt_coeffs = [(array(scaling_coeffs), array(filt_detail_coeffs))]

### Signal Reconstruction through an inverse SWT

In [10]:
rec_signal = iswt(swt_coeffs, "haar")

# Filtering Stage 3 (Smoothing)

In [15]:
signal_int = bsnb.smooth(rec_signal, sr * 3) # Moving average witt window size equal to 3 x sampling rate.

# Rescale signal.
signal_int = signal_int * (max(signal_ext) / max(signal_int))

# Parameter Extraction

In [20]:
# Parameter extraction.
param_dict = {}

#[Latency to stimulus onset]
signal_2nd_der = diff(diff(signal_int)) # Generation of 2nd derivative function.
der_thr = max(signal_2nd_der) # Identification of our major concavity point (start of response time)
response_sample = argmax(signal_2nd_der)
response_time = response_sample / sr
param_dict["Latency to stimulus onset"] = response_time - 0

#[EDR amplitude]
eda_max = max(signal_int)
eda_basal = signal_int[response_sample]
param_dict["EDR amplitude"] = eda_max - eda_basal

#[EDR rising time (Rise Time)]
eda_max_sample = argmax(signal_int)
eda_max_time = eda_max_sample / sr
param_dict["EDR rising time (Rise Time)"] = eda_max_time - response_time

#[EDR response peak (Peak Time)]
param_dict["EDR response peak (Peak Time)"] = eda_max_time

#[Recovery time to 50% amplitude]
time_50 = None
for i in range(eda_max_sample, len(signal_int)):
    if signal_int[i] <= eda_max - 0.50 * param_dict["EDR amplitude"]:
        time_50 = i / sr
        break
if time_50 is not None:
    param_dict["Recovery time to 50% amplitude"] = time_50 - eda_max_time
else:
    param_dict["Recovery time to 50% amplitude"] = None

#[Recovery time to 63% amplitude]
time_63 = None
for i in range(eda_max_sample, len(signal_int)):
    if signal_int[i] <= eda_max - 0.63 * param_dict["EDR amplitude"]:
        time_63 = i / sr
        break
    
if time_63 is not None:
    param_dict["Recovery time to 63% amplitude"] = time_63 - eda_max_time
else:
    param_dict["Recovery time to 63% amplitude"] = None