In [4]:
import numpy as np
import mne
from scipy import signal
from scipy.interpolate import RectBivariateSpline
from mne.filter import resample, filter_data
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from lspopt import spectrogram_lspopt
from matplotlib.colors import Normalize, ListedColormap

import logging
LOGGING_TYPES = dict(DEBUG=logging.DEBUG, INFO=logging.INFO, WARNING=logging.WARNING,
                     ERROR=logging.ERROR, CRITICAL=logging.CRITICAL)
logger = logging.getLogger('yasa')

%matplotlib qt

In [5]:
# Load the EDF file
fname = "P8_N3"  # define here
lr = "L"  # define here
location = f"/Users/amirhosseindaraie/Desktop/data/autoscoring-material/data/Zmax Donders/{fname}"
raw = mne.io.read_raw_edf(f"{location}/EEG {lr}.edf", preload=True, verbose=0)
raw.pick_types(eeg=True)
# fig = raw.plot(use_opengl=False)

# Apply a zero-phase bandpass filter between 0.5 ~ 45 Hz
raw.filter(0.5, 45)

# Plot properties of the filter
filt = mne.filter.create_filter(raw._data, 256, 0.5, 40)
mne.viz.plot_filter(filt, 256)
plt.savefig("filter shape.png", dpi=100, bbox_inches="tight")

# Extract the data and convert from V to uV
data = raw._data * 1e6
sf = raw.info["sfreq"]
chan = raw.ch_names

# Let's have a look at the data
print("Chan =", chan)
print("Sampling frequency =", sf, "Hz")
print("Data shape =", data.shape)


def format_seconds_to_hhmmss(seconds):
    # Return hhmmss of total seconds parameter
    hours = seconds // (60 * 60)
    seconds %= 60 * 60
    minutes = seconds // 60
    seconds %= 60
    return "%02i:%02i:%02i" % (hours, minutes, seconds)


print(
    f"Duration: {data.shape[1]/sf} (sec) OR {format_seconds_to_hhmmss(data.shape[1]/sf)}"
)


Filtering raw data in 1 contiguous segment
Setting up band-pass filter from 0.5 - 45 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 45.00 Hz
- Upper transition bandwidth: 11.25 Hz (-6 dB cutoff frequency: 50.62 Hz)
- Filter length: 1691 samples (6.605 sec)

Setting up band-pass filter from 0.5 - 40 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.50
- Lower transition bandwidth: 0.50 Hz (-6 dB cutoff frequency: 0.25 Hz)
- Upper passband edge: 40.00 Hz
- Upper transition bandwidth

In [None]:
import antropy as ant
import scipy.signal as sp_sig
import scipy.stats as sp_stats
from numpy import apply_along_axis as apply

pd.set_option("display.max_columns", None)
pd.set_option("display.expand_frame_repr", False)
pd.set_option("max_colwidth", -1)

# Time vector in seconds
times = np.arange(data.size) / sf


def sliding_window(data, sf, window, step=None, axis=-1):
    """Calculate a sliding window of a 1D or 2D EEG signal.
    .. versionadded:: 0.1.7
    Parameters
    ----------
    data : numpy array
        The 1D or 2D EEG data.
    sf : float
        The sampling frequency of ``data``.
    window : int
        The sliding window length, in seconds.
    step : int
        The sliding window step length, in seconds.
        If None (default), ``step`` is set to ``window``,
        which results in no overlap between the sliding windows.
    axis : int
        The axis to slide over. Defaults to the last axis.
    Returns
    -------
    times : numpy array
        Time vector, in seconds, corresponding to the START of each sliding
        epoch in ``strided``.
    strided : numpy array
        A matrix where row in last dimension consists of one instance
        of the sliding window, shape (n_epochs, ..., n_samples).
    Notes
    -----
    This is a wrapper around the
    :py:func:`numpy.lib.stride_tricks.as_strided` function.
    Examples
    --------
    With a 1-D array
    >>> import numpy as np
    >>> from yasa import sliding_window
    >>> data = np.arange(20)
    >>> times, epochs = sliding_window(data, sf=1, window=5)
    >>> times
    array([ 0.,  5., 10., 15.])
    >>> epochs
    array([[ 0,  1,  2,  3,  4],
           [ 5,  6,  7,  8,  9],
           [10, 11, 12, 13, 14],
           [15, 16, 17, 18, 19]])
    >>> sliding_window(data, sf=1, window=5, step=1)[1]
    array([[ 0,  1,  2,  3,  4],
           [ 2,  3,  4,  5,  6],
           [ 4,  5,  6,  7,  8],
           [ 6,  7,  8,  9, 10],
           [ 8,  9, 10, 11, 12],
           [10, 11, 12, 13, 14],
           [12, 13, 14, 15, 16],
           [14, 15, 16, 17, 18]])
    >>> sliding_window(data, sf=1, window=11)[1]
    array([[ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10]])
    With a N-D array
    >>> np.random.seed(42)
    >>> # 4 channels x 20 samples
    >>> data = np.random.randint(-100, 100, size=(4, 20))
    >>> epochs = sliding_window(data, sf=1, window=10)[1]
    >>> epochs.shape  # shape (n_epochs, n_channels, n_samples)
    (2, 4, 10)
    >>> epochs
    array([[[  2,  79,  -8, -86,   6, -29,  88, -80,   2,  21],
            [-13,  57, -63,  29,  91,  87, -80,  60, -43, -79],
            [-50,   7, -46, -37,  30, -50,  34, -80, -28,  66],
            [ -9,  10,  87,  98,  71, -93,  74, -66, -20,  63]],
           [[-26, -13,  16,  -1,   3,  51,  30,  49, -48, -99],
            [-12, -52, -42,  69,  87, -86,  89,  89,  74,  89],
            [-83,  31, -12, -41, -87, -92, -11, -48,  29, -17],
            [-51,   3,  31, -99,  33, -47,   5, -97, -47,  90]]])
    """
    from numpy.lib.stride_tricks import as_strided

    assert axis <= data.ndim, "Axis value out of range."
    assert isinstance(sf, (int, float)), "sf must be int or float"
    assert isinstance(window, (int, float)), "window must be int or float"
    assert isinstance(step, (int, float, type(None))), (
        "step must be int, " "float or None."
    )
    if isinstance(sf, float):
        assert sf.is_integer(), "sf must be a whole number."
        sf = int(sf)
    assert isinstance(axis, int), "axis must be int."

    # window and step in samples instead of points
    window *= sf
    step = window if step is None else step * sf

    if isinstance(window, float):
        assert window.is_integer(), "window * sf must be a whole number."
        window = int(window)

    if isinstance(step, float):
        assert step.is_integer(), "step * sf must be a whole number."
        step = int(step)

    assert step >= 1, "Stepsize may not be zero or negative."
    assert window < data.shape[axis], (
        "Sliding window size may not exceed " "size of selected axis"
    )

    # Define output shape
    shape = list(data.shape)
    shape[axis] = np.floor(data.shape[axis] / step - window / step + 1).astype(int)
    shape.append(window)

    # Calculate strides and time vector
    strides = list(data.strides)
    strides[axis] *= step
    strides.append(data.strides[axis])
    strided = as_strided(data, shape=shape, strides=strides)
    t = np.arange(strided.shape[-2]) * (step / sf)

    # Swap axis: n_epochs, ..., n_samples
    if strided.ndim > 2:
        strided = np.rollaxis(strided, -2, 0)
    return t, strided


# Convert the EEG data to 30-sec data
times, data_win = sliding_window(data[0], sf, window=30)

# Convert times to minutes
times /= 60


def lziv(x):
    """Binarize the EEG signal and calculate the Lempel-Ziv complexity."""
    return ant.lziv_complexity(x > x.mean(), normalize=True)


In [34]:
# generate a plot for manuscript

begin = 3.5
end = 3.6

plt.figure(figsize=(8, 3))

raw = mne.io.read_raw_edf(f"{location}/EEG L.edf", preload=True, verbose=0)
raw.pick_types(eeg=True)
data = raw._data * 1e6
y = data[0]
t = np.arange(data.size) / sf / 60 / 60
plt.plot(
    t[np.where(t == begin)[0][0] : np.where(t == end)[0][0]] * 60,
    y[np.where(t == begin)[0][0] : np.where(t == end)[0][0]],
    c="mediumblue",
    label="EEG L"
)

raw = mne.io.read_raw_edf(f"{location}/EEG R.edf", preload=True, verbose=0)
raw.pick_types(eeg=True)
data = raw._data * 1e6
y = data[0]
plt.plot(
    t[np.where(t == begin)[0][0] : np.where(t == end)[0][0]] * 60,
    y[np.where(t == begin)[0][0] : np.where(t == end)[0][0]] + 130,
    c="dodgerblue",
    label="EEG R"
)
plt.xlim([t[np.where(t == begin)[0][0]] * 60, t[np.where(t == end)[0][0]] * 60])
plt.yticks([])
plt.xlabel("Time ($min$)")
plt.ylabel("EEG ($mV$)")
plt.title("Sleep EEG Wave")
plt.legend()
plt.tight_layout()
plt.savefig("sleep_EEG_wave.svg")
plt.show()

# raw.filter(0.5, 45)
# data = raw._data * 1e6
# y = data[0]
# plt.plot(
#     t[np.where(t == begin)[0][0] : np.where(t == end)[0][0]] * 60,
#     y[np.where(t == begin)[0][0] : np.where(t == end)[0][0]],
# )


In [None]:
data_win.shape

In [None]:
# Calculate standard descriptive statistics
hmob, hcomp = ant.hjorth_params(data_win, axis=1)

# Feature extraction
df_feat = {
    # Statistical
    "std": apply(np.std, arr=data_win, axis=1, ddof=1),
    "mean": apply(np.mean, arr=data_win, axis=1),
    "median": apply(np.median, arr=data_win, axis=1),
    "iqr": apply(sp_stats.iqr, arr=data_win, axis=1, rng=(25, 75)),
    "skew": apply(sp_stats.skew, arr=data_win, axis=1),
    "kurt": apply(sp_stats.kurtosis, arr=data_win, axis=1),
    "nzc": apply(ant.num_zerocross, arr=data_win, axis=1),
    "hmob": hmob,
    "hcomp": hcomp,
    # Entropy
    "perm_entropy": apply(ant.perm_entropy, axis=1, arr=data_win, normalize=True),
    "svd_entropy": apply(ant.svd_entropy, 1, data_win, normalize=True),
    "sample_entropy": apply(ant.sample_entropy, 1, data_win),
    "app_entropy": apply(ant.app_entropy, 1, data_win, order=2),
    "spec_entropy": apply(
        ant.spectral_entropy,
        1,
        data_win,
        sf,
        normalize=True,
        method="welch",
        nperseg=50,
    ),
    "lziv": apply(ant.lziv_complexity, 1, data_win),
    # Fractal dimension
    "dfa": apply(ant.detrended_fluctuation, 1, data_win),
    "petrosian": apply(ant.petrosian_fd, 1, data_win),
    "katz": apply(ant.katz_fd, 1, data_win),
    "higuchi": apply(ant.higuchi_fd, 1, data_win),
}


df_feat = pd.DataFrame(df_feat)
df_feat.head()


In [None]:
from scipy.integrate import simps
from scipy.signal import welch

# Estimate power spectral density using Welch's method
freqs, psd = welch(data_win, sf, nperseg=int(4 * sf))


def bandpower_from_psd_ndarray(
    psd,
    freqs,
    bands=[
        (0.5, 4, "Delta"),
        (4, 8, "Theta"),
        (8, 12, "Alpha"),
        (12, 16, "Sigma"),
        (16, 30, "Beta"),
        (30, 40, "Gamma"),
    ],
    relative=True,
):
    """Compute bandpowers in N-dimensional PSD.
    This is a np-only implementation of the :py:func:`yasa.bandpower_from_psd` function,
    which supports 1-D arrays of shape (n_freqs), or N-dimensional arays (e.g. 2-D (n_chan,
    n_freqs) or 3-D (n_chan, n_epochs, n_freqs))
    .. versionadded:: 0.2.0
    Parameters
    ----------
    psd : :py:class:`np.ndarray`
        Power spectral density of data, in uV^2/Hz. Must be a N-D array of shape (..., n_freqs).
        See :py:func:`scipy.signal.welch` for more details.
    freqs : :py:class:`np.ndarray`
        Array of frequencies. Must be a 1-D array of shape (n_freqs,)
    bands : list of tuples
        List of frequency bands of interests. Each tuple must contain the lower and upper
        frequencies, as well as the band name (e.g. (0.5, 4, 'Delta')).
    relative : boolean
        If True, bandpower is divided by the total power between the min and
        max frequencies defined in ``band`` (default 0.5 to 40 Hz).
    Returns
    -------
    bandpowers : :py:class:`np.ndarray`
        Bandpower array of shape *(n_bands, ...)*.
    """
    # Type checks
    assert isinstance(bands, list), "bands must be a list of tuple(s)"
    assert isinstance(relative, bool), "relative must be a boolean"

    # Safety checks
    freqs = np.asarray(freqs)
    psd = np.asarray(psd)
    assert freqs.ndim == 1, "freqs must be a 1-D array of shape (n_freqs,)"
    assert psd.shape[-1] == freqs.shape[-1], "n_freqs must be last axis of psd"

    # Extract frequencies of interest
    all_freqs = np.hstack([[b[0], b[1]] for b in bands])
    fmin, fmax = min(all_freqs), max(all_freqs)
    idx_good_freq = np.logical_and(freqs >= fmin, freqs <= fmax)
    freqs = freqs[idx_good_freq]
    res = freqs[1] - freqs[0]

    # Trim PSD to frequencies of interest
    psd = psd[..., idx_good_freq]

    # Check if there are negative values in PSD
    if (psd < 0).any():
        msg = (
            "There are negative values in PSD. This will result in incorrect "
            "bandpower values. We highly recommend working with an "
            "all-positive PSD. For more details, please refer to: "
            "https://github.com/raphaelvallat/yasa/issues/29"
        )
        logger.warning(msg)

    # Calculate total power
    total_power = simps(psd, dx=res, axis=-1)
    total_power = total_power[np.newaxis, ...]

    # Initialize empty array
    bp = np.zeros((len(bands), *psd.shape[:-1]), dtype=np.float64)

    # Enumerate over the frequency bands
    labels = []
    for i, band in enumerate(bands):
        b0, b1, la = band
        labels.append(la)
        idx_band = np.logical_and(freqs >= b0, freqs <= b1)
        bp[i] = simps(psd[..., idx_band], dx=res, axis=-1)

    if relative:
        bp /= total_power
    return bp


# Compute bandpowers in N-dimensional PSD
bp = bandpower_from_psd_ndarray(psd, freqs)
bp = pd.DataFrame(bp.T, columns=["delta", "theta", "alpha", "sigma", "beta", "gamma"])
df_feat = pd.concat([df_feat, bp], axis=1)
df_feat.head()


In [None]:
# Ratio of spectral power
df_feat.eval("dt = delta / theta", inplace=True)
df_feat.eval("da = delta / alpha", inplace=True)
df_feat.eval("ds = delta / sigma", inplace=True)
df_feat.eval("db = delta / beta", inplace=True)
df_feat.eval("dg = delta / gamma", inplace=True)

df_feat.eval("td = theta / delta", inplace=True)
df_feat.eval("ta = theta / alpha", inplace=True)
df_feat.eval("ts = theta / sigma", inplace=True)
df_feat.eval("tb = theta / beta", inplace=True)
df_feat.eval("tg = theta / gamma", inplace=True)

df_feat.eval("ad = alpha / delta", inplace=True)
df_feat.eval("at = alpha / theta", inplace=True)
df_feat.eval("asi = alpha / sigma", inplace=True)
df_feat.eval("ab = alpha / beta", inplace=True)
df_feat.eval("ag = alpha / gamma", inplace=True)

df_feat.eval("sd = sigma / delta", inplace=True)
df_feat.eval("st = sigma / theta", inplace=True)
df_feat.eval("sa = sigma / alpha", inplace=True)
df_feat.eval("sb = sigma / beta", inplace=True)
df_feat.eval("sg = sigma / gamma", inplace=True)

df_feat.eval("bd = beta / delta", inplace=True)
df_feat.eval("bt = beta / theta", inplace=True)
df_feat.eval("ba = beta / alpha", inplace=True)
df_feat.eval("bs = beta / sigma", inplace=True)
df_feat.eval("bg = beta / gamma", inplace=True)

df_feat.eval("gd = gamma / delta", inplace=True)
df_feat.eval("gt = gamma / theta", inplace=True)
df_feat.eval("ga = gamma / alpha", inplace=True)
df_feat.eval("gs = gamma / sigma", inplace=True)
df_feat.eval("gb = gamma / beta", inplace=True)

df_feat.eval("ta_b = (theta + alpha)/beta", inplace=True)
df_feat.eval("ta_ab = (theta + alpha)/(alpha + beta)", inplace=True)
df_feat.eval("gb_da = (gamma + beta)/(delta + alpha)", inplace=True)

df_feat.head()


# Compute the envelope derivative operator

In [None]:
"""Compute the envelope derivative operator (EDO), as defined in [1].
[1] JM O' Toole, A Temko, NJ Stevenson, “Assessing instantaneous energy in the EEG: a non-negative, frequency-weighted energy operator”, IEEE Int. Conf.  on Eng. in Medicine and Biology, Chicago, August 2014
"""


def discrete_hilbert(x, DBplot=False):
    """Discrete Hilbert transform
    Parameters
    ----------
    x: ndarray
        input signal
    DBplot: bool, optional
        plot or not
    Returns
    -------
    x_hilb : ndarray
        Hilbert transform of x
    """
    N = len(x)
    Nh = np.ceil(N / 2)
    k = np.arange(N)

    # build the Hilbert transform in the frequency domain:
    H = -1j * np.sign(Nh - k) * np.sign(k)
    x_hilb = np.fft.ifft(np.fft.fft(x) * H)
    x_hilb = np.real(x_hilb)

    if DBplot:
        plt.figure(10, clear=True)
        plt.plot(np.imag(H))

    return x_hilb


def edo(x, DBplot=False):
    """Generate Envelope Derivative Operator (EDO) Γ[x(n)] from simple formula in the time domain:
    Γ[x(n)] = y(n)² + H[y(n)]²
    where y(n) is the derivative of x(n) using the central-finite method and H[.] is the
    Hilbert transform.
    Parameters
    ----------
    x: ndarray
        input signal
    DBplot: bool, optional
        plot or not
    Returns
    -------
    x_edo : ndarray
        EDO of x
    """
    # 1. check if odd length and if so make even:
    N_start = len(x)
    if (N_start % 2) != 0:
        x = np.hstack((x, 0))

    N = len(x)
    nl = np.arange(1, N - 1)
    xx = np.zeros(N)

    # 2. calculate the Hilbert transform
    h = discrete_hilbert(x)

    # 3. implement with the central finite difference equation
    xx[nl] = (
        (x[nl + 1] ** 2) + (x[nl - 1] ** 2) + (h[nl + 1] ** 2) + (h[nl - 1] ** 2)
    ) / 4 - ((x[nl + 1] * x[nl - 1] + h[nl + 1] * h[nl - 1]) / 2)

    # trim and zero-pad and the ends:
    x_edo = np.pad(xx[2 : (len(xx) - 2)], (2, 2), "constant", constant_values=(0, 0))

    if DBplot:
        plt.figure(2, clear=True)
        (hl1,) = plt.plot(x, label="test signal")
        (hl2,) = plt.plot(x_edo, label="EDO")
        plt.legend(handles=[hl1, hl2], loc="upper right")
        plt.pause(0.0001)

    return x_edo[0:N_start]


def test_edo_random():
    """Test EDO with a random signal"""

    DBplot = True
    x = np.random.randn(102)
    x_e = edo(x)

    # -------------------------------------------------------------------
    # plot
    # -------------------------------------------------------------------
    if DBplot:
        plt.figure(2, clear=True)
        (hl1,) = plt.plot(x, label="test signal")
        (hl2,) = plt.plot(x_e, label="EDO")
        plt.legend(handles=[hl1, hl2], loc="upper right")
        plt.pause(0.0001)


""" General_nleo: ''General'' Non-Linear Energy Operator (NLEO) expression: 
Ψ(n)=x(n-l)x(n-p)-x(n-q)x(n-s) for l+p=q+s  (and [l,p]≠[q,s], otherwise Ψ(n)=0)
"""


def gen_nleo(x, l=1, p=2, q=0, s=3):
    """general form of the nonlinear energy operator (NLEO)
    General NLEO expression: Ψ(n) = x(n-l)x(n-p) - x(n-q)x(n-s)
    for l+p=q+s  (and [l,p]≠[q,s], otherwise Ψ(n)=0)
    Parameters
    ----------
    x: ndarray
        input signal
    l: int, optional
        parameter of NLEO expression (see above)
    p: int, optional
        parameter of NLEO expression (see above)
    q: int, optional
        parameter of NLEO expression (see above)
    s: int, optional
        parameter of NLEO expression (see above)
    Returns
    -------
    x_nleo : ndarray
        NLEO array
    Example
    -------
    import np as np
    # generate test signal
    N = 256
    n = np.arange(N)
    w1 = np.pi / (N / 32)
    ph1 = -np.pi + 2 * np.pi * np.random.rand(1)
    a1 = 1.3
    x1 = a1 * np.cos(w1 * n + ph1)
    # compute instantaneous energy:
    x_nleo = gen_nleo(x1, 1, 2, 0, 3)
    # plot:
    plt.figure(1, clear=True)
    plt.plot(x1, '-o', label='test signal')
    plt.plot(x_nleo, '-o', label='Agarwal-Gotman')
    plt.legend(loc='upper left')
    """
    # check parameters:
    if (l + p) != (q + s) and any(np.sort((l, p)) != np.sort((q, s))):
        warning("Incorrect parameters for NLEO. May be zero!")

    N = len(x)
    x_nleo = np.zeros(N)

    iedges = abs(l) + abs(p) + abs(q) + abs(s)
    n = np.arange(iedges + 1, (N - iedges - 1))

    x_nleo[n] = x[n - l] * x[n - p] - x[n - q] * x[n - s]

    return x_nleo


def nleo(x, type="teager"):
    """generate different NLEOs based on the same operator
    Parameters
    ----------
    x: ndarray
        input signal
    type: {'teager', 'agarwal', 'palmu', 'abs_teager', 'env_only'}
        which type of NLEO?

    Returns
    -------
    x_nleo : ndarray
        NLEO array


    Additional Notes
    ----------------
    {'teager': 'Teager-Kaiser', 'agarwal': 'Agarwal-Gotman', 'palmu': 'Palmu et.al.'}

    """

    def teager():
        return gen_nleo(x, 0, 0, 1, -1)

    def agarwal():
        return gen_nleo(x, 1, 2, 0, 3)

    def palmu():
        return abs(gen_nleo(x, 1, 2, 0, 3))

    def abs_teager():
        return abs(gen_nleo(x, 0, 0, 1, -1))

    def env_only():
        return abs(x) ** 2

    def default_nleo():
        # -------------------------------------------------------------------
        # default option
        # -------------------------------------------------------------------
        print("Invalid NLEO name; defaulting to Teager")
        return teager()

    # pick which function to execute
    which_nleo = {
        "teager": teager,
        "agarwal": agarwal,
        "palmu": palmu,
        "abs_teager": abs_teager,
        "env_only": env_only,
    }

    def get_nleo(name):
        return which_nleo.get(name, default_nleo)()

    x_nleo = get_nleo(type)
    return x_nleo


def test_compare_nleos(x=None, DBplot=True):
    """test all NLEO variants with 1 signal
    Parameters
    ----------
    x: ndarray, optional
        input signal (defaults to coloured Gaussian noise)
    DBplot: bool
        plot or not
    """
    if x is None:
        N = 128
        x = np.cumsum(np.random.randn(N))

    all_methods = ["teager", "agarwal", "palmu"]
    all_methods_strs = {
        "teager": "Teager-Kaiser",
        "agarwal": "Agarwal-Gotman",
        "palmu": "Palmu et.al.",
    }
    x_nleo = dict.fromkeys(all_methods)

    for n in all_methods:
        x_nleo[n] = nleo(x, n)

    if DBplot:
        fig, ax = plt.subplots(nrows=2, ncols=1, num=4, clear=True)
        ax[0].plot(x, "-o", label="test signal")
        for n in all_methods:
            ax[1].plot(x_nleo[n], "-o", label=all_methods_strs[n])
        ax[0].legend(loc="upper right")
        ax[1].legend(loc="upper left")
        plt.pause(0.0001)


In [None]:
def energy_operators_from_signal_ndarray(
    x, ops=["teager", "agarwal", "palmu", "abs_teager", "env_only", "edo"]
):
    def teager():
        return nleo(x, "teager")

    def agarwal():
        return nleo(x, "agarwal")

    def palmu():
        return nleo(x, "palmu")

    def abs_teager():
        return nleo(x, "abs_teager")

    def env_only():
        return nleo(x, "env_only")

    def edo_f():
        return edo(x)

    def default_eo():
        # -------------------------------------------------------------------
        # default option
        # -------------------------------------------------------------------
        print("Invalid EO name; defaulting to Teager")
        return teager()

    which_energy_operator = {
        "teager": teager,
        "agarwal": agarwal,
        "palmu": palmu,
        "abs_teager": abs_teager,
        "env_only": env_only,
        "edo": edo_f,
    }

    def get_energy_operator(name):
        return which_energy_operator.get(name, default_eo)()

    feat = np.zeros((x.shape[0], len(ops)))

    for i, op in enumerate(ops):
        x_nleo = get_energy_operator(op)  # Function
        feat[:, i] = x_nleo

    return feat


data_win_mean = apply(np.mean, axis=1, arr=data_win)
featEnergy = energy_operators_from_signal_ndarray(
    data_win_mean, ops=["teager", "agarwal", "palmu", "abs_teager", "env_only", "edo"]
)
featEnergy = pd.DataFrame(
    featEnergy, columns=["teager", "agarwal", "palmu", "abs_teager", "env_only", "edo"]
)
df_feat = pd.concat([df_feat, featEnergy], axis=1)
df_feat.head()


In [None]:
# Write feature object to a comma-separated values (csv) file
df_feat.to_csv(f"feature/{fname} {lr}.csv", index=False)


In [None]:
# Load feature object as a dataframe
df_feat = pd.read_csv(f"feature/{fname} {lr}.csv", index_col=False)


# Lets add some new features

In [None]:
def hjorth_activity(x):
    """Column-wise computation of Hjorth activity (variance)."""
    return np.var(x, axis=0)


def hjorth_mobility(x):
    """Column-wise computation of Hjorth mobility"""
    return np.sqrt(np.var(np.gradient(x, axis=0), axis=0) / np.var(x, axis=0))


def hjorth_complexity(x):
    """Column-wise computation of Hjorth complexity"""
    return hjorth_mobility(np.gradient(x, axis=0)) / hjorth_mobility(x)


data = data_cosine(N=1024, A=0.1, sampling=1024, freq=200)
hjorth_act = hjorth_activity(data)
hjorth_com = hjorth_complexity(data)
hjorth_mob = hjorth_mobility(data)


# Energy 

In [None]:
# Energy (E) of the signal is the sum of the squares of amplitude
def energy_fn(x):
    x /= np.max(x)
    return np.mean(x**2)


E = np.apply_along_axis(energy_fn, 1, data_win)
df_feat["E"] = E


# Frequency Domain Features 

In [None]:
def bandpower_from_psd_ndarray(
    psd,
    freqs,
    bands=[
        (0.5, 4, "Delta"),
        (4, 8, "Theta"),
        (8, 12, "Alpha"),
        (12, 16, "Sigma"),
        (16, 30, "Beta"),
        (30, 40, "Gamma"),
    ],
    relative=True,
):
    """Compute bandpowers in N-dimensional PSD.
    This is a np-only implementation of the :py:func:`yasa.bandpower_from_psd` function,
    which supports 1-D arrays of shape (n_freqs), or N-dimensional arays (e.g. 2-D (n_chan,
    n_freqs) or 3-D (n_chan, n_epochs, n_freqs))
    .. versionadded:: 0.2.0
    Parameters
    ----------
    psd : :py:class:`np.ndarray`
        Power spectral density of data, in uV^2/Hz. Must be a N-D array of shape (..., n_freqs).
        See :py:func:`scipy.signal.welch` for more details.
    freqs : :py:class:`numpy.ndarray`
        Array of frequencies. Must be a 1-D array of shape (n_freqs,)
    bands : list of tuples
        List of frequency bands of interests. Each tuple must contain the lower and upper
        frequencies, as well as the band name (e.g. (0.5, 4, 'Delta')).
    relative : boolean
        If True, bandpower is divided by the total power between the min and
        max frequencies defined in ``band`` (default 0.5 to 40 Hz).
    Returns
    -------
    bandpowers : :py:class:`numpy.ndarray`
        Bandpower array of shape *(n_bands, ...)*.
    """
    # Type checks
    assert isinstance(bands, list), "bands must be a list of tuple(s)"
    assert isinstance(relative, bool), "relative must be a boolean"

    # Safety checks
    freqs = np.asarray(freqs)
    psd = np.asarray(psd)
    assert freqs.ndim == 1, "freqs must be a 1-D array of shape (n_freqs,)"
    assert psd.shape[-1] == freqs.shape[-1], "n_freqs must be last axis of psd"

    # Extract frequencies of interest
    all_freqs = np.hstack([[b[0], b[1]] for b in bands])
    fmin, fmax = min(all_freqs), max(all_freqs)
    idx_good_freq = np.logical_and(freqs >= fmin, freqs <= fmax)
    freqs = freqs[idx_good_freq]
    res = freqs[1] - freqs[0]

    # Trim PSD to frequencies of interest
    psd = psd[..., idx_good_freq]

    # Check if there are negative values in PSD
    if (psd < 0).any():
        msg = (
            "There are negative values in PSD. This will result in incorrect "
            "bandpower values. We highly recommend working with an "
            "all-positive PSD. For more details, please refer to: "
            "https://github.com/raphaelvallat/yasa/issues/29"
        )
        logger.warning(msg)

    # Calculate total power
    total_power = simps(psd, dx=res, axis=-1)
    total_power = total_power[np.newaxis, ...]

    # Initialize empty array
    bp = np.zeros((len(bands), *psd.shape[:-1]), dtype=np.float64)

    # Enumerate over the frequency bands
    labels = []
    for i, band in enumerate(bands):
        b0, b1, la = band
        labels.append(la)
        idx_band = np.logical_and(freqs >= b0, freqs <= b1)
        bp[i] = simps(psd[..., idx_band], dx=res, axis=-1)

    if relative:
        bp /= total_power
    return bp


In [None]:
# for x in range(1, 10):
#     plt.plot(freqs, psd[int(x), :])


In [None]:
from scipy.integrate import simps
from scipy.signal import welch

# Estimate power spectral density using Welch's method
freqs, psd = welch(data_win, sf, nperseg=int(4 * sf))

# Compute features
## Compute featrues for normal singal (to compare w/ psd later)
hmob, hcomp = ant.hjorth_params(data_win, axis=1)
std_nor = np.apply_along_axis(np.std, 1, data_win, ddof=1)
mean_nor = np.apply_along_axis(np.mean, 1, data_win)
median_nor = np.apply_along_axis(np.median, 1, data_win)
iqr_nor = np.apply_along_axis(sp_stats.iqr, 1, data_win, rng=(25, 75))
skew_nor = np.apply_along_axis(sp_stats.skew, 1, data_win)
kurt_nor = np.apply_along_axis(sp_stats.kurtosis, 1, data_win)
hmob_nor = hmob
hcomp_nor = hcomp

## Compute featrues for PSD
hmob, hcomp = ant.hjorth_params(psd, axis=1)
std_psd = np.apply_along_axis(np.std, 1, psd, ddof=1)
mean_psd = np.apply_along_axis(np.mean, 1, psd)
median_psd = np.apply_along_axis(np.median, 1, psd)
iqr_psd = np.apply_along_axis(sp_stats.iqr, 1, psd, rng=(25, 75))
skew_psd = np.apply_along_axis(sp_stats.skew, 1, psd)
kurt_psd = np.apply_along_axis(sp_stats.kurtosis, 1, psd)
hmob_psd = hmob
hcomp_psd = hcomp

# Add features to features dataframe
df_feat["E"] = E
df_feat["std_psd"] = std_psd
df_feat["mean_psd"] = mean_psd
df_feat["iqr_psd"] = iqr_psd
df_feat["skew_psd"] = skew_psd
df_feat["kurt_psd"] = kurt_psd
df_feat["hmob_psd"] = hmob_psd
df_feat["hcomp_psd"] = hcomp_psd


In [None]:
def normalize(v):
    norm = np.linalg.norm(v)
    if norm == 0:
        return v
    return v / norm


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(std_psd), label="PSD STD")
plt.plot(normalize(std_nor), label="Signal STD")
plt.xlabel("Epoch")
plt.ylabel("STD")
plt.title("Standard Deviation from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("std.png", format="png")
# plt.savefig("std.svg", format="svg")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(mean_psd), label="PSD mean")
plt.plot(normalize(mean_nor), label="Signal mean")
plt.xlabel("Epoch")
plt.ylabel("Mean")
plt.title("Mean from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("mean.svg", format="svg")
# plt.savefig("mean.png", format="png")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(median_psd), label="PSD median")
plt.plot(normalize(median_nor), label="Signal median")
plt.xlabel("Epoch")
plt.ylabel("Median")
plt.title("Median from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("median.svg", format="svg")
# plt.savefig("median.png", format="png")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(iqr_psd), label="PSD iqr")
plt.plot(normalize(iqr_nor), label="Signal iqr")
plt.xlabel("Epoch")
plt.ylabel("IQR")
plt.title("IQR from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("iqr.svg", format="svg")
# plt.savefig("iqr.png", format="png")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(skew_psd), label="PSD skew")
plt.plot(normalize(skew_nor), label="Signal skew")
plt.xlabel("Epoch")
plt.ylabel("Skewness")
plt.title("Skewness from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("skew.svg", format="svg")
# plt.savefig("skew.png", format="png")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(kurt_psd), label="PSD kurt")
plt.plot(normalize(kurt_nor), label="Signal kurt")
plt.xlabel("Epoch")
plt.ylabel("Kurtosis")
plt.title("Kurtosis from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("kurt.svg", format="svg")
# plt.savefig("kurt.png", format="png")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(hmob_psd), label="PSD hmob")
plt.plot(normalize(hmob_nor), label="Signal hmob")
plt.xlabel("Epoch")
plt.ylabel("Hjorth Mobility")
plt.title("Hjorth Mobility from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("hjorth_mob.svg", format="svg")
# plt.savefig("hjorth_mob.png", format="png")
plt.show()


In [None]:
plt.figure(figsize=(8, 6))
plt.plot(normalize(hcomp_psd), label="PSD hcomp")
plt.plot(normalize(hcomp_nor), label="Signal hcomp")
plt.xlabel("Epoch")
plt.ylabel("Hjorth Complexity")
plt.title("Hjorth Complexity from singal and it's PSD [P8_N3 L]")
plt.legend()
plt.tight_layout()
# plt.savefig("hjorth_comp.svg", format="svg")
# plt.savefig("hjorth_comp.png", format="png")
plt.show()


# Wavelet Energy 

In [None]:
def calc_wavelet_energy(data_set):
    """
    Input : 1 * N vector
    Output: Float with the wavelet energy of the input vector,
    rounded to 3 decimal places.
    """
    # p_sqr = [i ** 2 for i in data_set]
    wavelet_energy = np.nansum(np.log2(np.square(data_set)))
    return round(wavelet_energy, 3)


wavelet_energy = np.apply_along_axis(calc_wavelet_energy, 1, data_win)

# Add features to features dataframe
df_feat["WEn"] = wavelet_energy


# Hurst Coefficients 

In [None]:
import math, sys


def __to_inc(x):
    incs = x[1:] - x[:-1]
    return incs


def __to_pct(x):
    pcts = x[1:] / x[:-1] - 1.0
    return pcts


def __get_RS(series, kind):
    """
    Get rescaled range (using the range of cumulative sum
    of deviations instead of the range of a series as in the simplified version
    of R/S) from a time-series of values.
    Parameters
    ----------
    series : array-like
        (Time-)series
    kind : str
        The kind of series (refer to compute_Hc docstring)
    """

    if kind == "random_walk":
        incs = __to_inc(series)
        mean_inc = (series[-1] - series[0]) / len(incs)
        deviations = incs - mean_inc
        Z = np.cumsum(deviations)
        R = max(Z) - min(Z)
        S = np.std(incs, ddof=1)

    elif kind == "price":
        incs = __to_pct(series)
        mean_inc = np.sum(incs) / len(incs)
        deviations = incs - mean_inc
        Z = np.cumsum(deviations)
        R = max(Z) - min(Z)
        S = np.std(incs, ddof=1)

    elif kind == "change":
        incs = series
        mean_inc = np.sum(incs) / len(incs)
        deviations = incs - mean_inc
        Z = np.cumsum(deviations)
        R = max(Z) - min(Z)
        S = np.std(incs, ddof=1)

    if R == 0 or S == 0:
        return 0  # return 0 to skip this interval due undefined R/S

    return R / S


def __get_simplified_RS(series, kind):
    """
    Simplified version of rescaled range
    Parameters
    ----------
    series : array-like
        (Time-)series
    kind : str
        The kind of series (refer to compute_Hc docstring)
    """

    if kind == "random_walk":
        incs = __to_inc(series)
        R = max(series) - min(series)  # range in absolute values
        S = np.std(incs, ddof=1)
    elif kind == "price":
        pcts = __to_pct(series)
        R = max(series) / min(series) - 1.0  # range in percent
        S = np.std(pcts, ddof=1)
    elif kind == "change":
        incs = series
        _series = np.hstack([[0.0], np.cumsum(incs)])
        R = max(_series) - min(_series)  # range in absolute values
        S = np.std(incs, ddof=1)

    if R == 0 or S == 0:
        return 0  # return 0 to skip this interval due the undefined R/S ratio

    return R / S


def compute_Hc(
    series, kind="random_walk", min_window=10, max_window=None, simplified=True
):
    """
    Compute H (Hurst exponent) and C according to Hurst equation:
    E(R/S) = c * T^H
    Refer to:
    https://en.wikipedia.org/wiki/Hurst_exponent
    https://en.wikipedia.org/wiki/Rescaled_range
    https://en.wikipedia.org/wiki/Random_walk
    Parameters
    ----------
    series : array-like
        (Time-)series
    kind : str
        Kind of series
        possible values are 'random_walk', 'change' and 'price':
        - 'random_walk' means that a series is a random walk with random increments;
        - 'price' means that a series is a random walk with random multipliers;
        - 'change' means that a series consists of random increments
            (thus produced random walk is a cumulative sum of increments);
    min_window : int, default 10
        the minimal window size for R/S calculation
    max_window : int, default is the length of series minus 1
        the maximal window size for R/S calculation
    simplified : bool, default True
        whether to use the simplified or the original version of R/S calculation
    Returns tuple of
        H, c and data
        where H and c — parameters or Hurst equation
        and data is a list of 2 lists: time intervals and R/S-values for correspoding time interval
        for further plotting log(data[0]) on X and log(data[1]) on Y
    """

    if len(series) < 100:
        raise ValueError("Series length must be greater or equal to 100")

    ndarray_likes = [np.ndarray]
    if "pandas.core.series" in sys.modules.keys():
        ndarray_likes.append(pd.core.series.Series)

    # convert series to np array if series is not np array or pandas Series
    if type(series) not in ndarray_likes:
        series = np.array(series)

    if (
        "pandas.core.series" in sys.modules.keys()
        and type(series) == pd.core.series.Series
    ):
        if series.isnull().values.any():
            raise ValueError("Series contains NaNs")
        series = series.values  # convert pandas Series to np array
    elif np.isnan(np.min(series)):
        raise ValueError("Series contains NaNs")

    if simplified:
        RS_func = __get_simplified_RS
    else:
        RS_func = __get_RS

    err = np.geterr()
    np.seterr(all="raise")

    max_window = max_window or len(series) - 1
    window_sizes = list(
        map(
            lambda x: int(10**x),
            np.arange(math.log10(min_window), math.log10(max_window), 0.25),
        )
    )
    window_sizes.append(len(series))

    RS = []
    for w in window_sizes:
        rs = []
        for start in range(0, len(series), w):
            if (start + w) > len(series):
                break
            _ = RS_func(series[start : start + w], kind)
            if _ != 0:
                rs.append(_)
        RS.append(np.mean(rs))

    A = np.vstack([np.log10(window_sizes), np.ones(len(RS))]).T
    H, c = np.linalg.lstsq(A, np.log10(RS), rcond=-1)[0]
    np.seterr(**err)

    c = 10**c
    return H, c  # , [window_sizes, RS]


# H, c, [window_sizes, RS] = compute_Hc(data_win[0,:])


In [None]:
Hurst_coeffs = np.apply_along_axis(compute_Hc, 1, data_win, kind="random_walk")
Hurst_H1 = Hurst_coeffs[:, 0]
Hurst_C1 = Hurst_coeffs[:, 1]
Hurst_coeffs = np.apply_along_axis(compute_Hc, 1, data_win, kind="change")
Hurst_H2 = Hurst_coeffs[:, 0]
Hurst_C2 = Hurst_coeffs[:, 1]


## Outlier Detection

In [None]:
import collections
import numpy as np
import scipy.stats as stat
from scipy.stats import iqr as IQR


class Outlier:
    """
    Find outlier in a numerical dataset with two different methods:
        - `sd_outlier`: z-score based method
        - `IQR_outlier`: IQR based method
    Also allows to remove/filter-out the detected outliers with `filter` method.
    `plot` method allows you to plot the original and filtered dataset and inspect the performance.
    """

    def __init__(self, x=None):
        self.x = x
        self.outliers = None
        self.outliersIndices = np.array([])
        self.x_filt = None

    def sd_outlier(self=None, x=None, axis=None, bar=3, side="both"):
        """
        z-score based method
        This method will test if the numbers falls outside the three standard deviations.
        Based on this rule, if the value is outlier, the method will return true, if not, return false.
        """

        assert side in ["gt", "lt", "both"], "Side should be `gt`, `lt` or `both`."

        if (x is None) and (self.x is not None):
            x = self.x
        elif (x is None) and (self.x is None):
            raise ValueError("Enter x input!")

        d_z = stat.zscore(x, axis=axis)

        if side == "gt":
            self.outliers = d_z > bar
            return d_z > bar
        elif side == "lt":
            self.outliers = d_z < -bar
            return d_z < -bar
        elif side == "both":
            self.outliers = np.abs(d_z) > bar
            return np.abs(d_z) > bar

    def __Q1(self, x, axis=None):
        if (x is None) and (self.x is not None):
            x = self.x
        elif (x is None) and (self.x is None):
            raise ValueError("Enter x input!")

        return np.percentile(x, 25, axis=axis)

    def __Q3(self, x, axis=None):
        if (x is None) and (self.x is not None):
            x = self.x
        elif (x is None) and (self.x is None):
            raise ValueError("Enter x input!")

        return np.percentile(x, 75, axis=axis)

    def IQR_outlier(self, x=None, axis=None, bar=1.5, side="both"):
        """
        IQR based method
        This method will test if the value is less than q1 - 1.5 * iqr or
        greater than q3 + 1.5 * iqr.
        """
        self.method = "IQR_outlier"

        assert side in ["gt", "lt", "both"], "Side should be `gt`, `lt` or `both`."

        if (x is None) and (self.x is not None):
            x = self.x
        elif (x is None) and (self.x is None):
            raise ValueError("Enter x input!")

        d_IQR = IQR(x, axis=axis)
        d_Q1 = self.__Q1(x, axis=axis)
        d_Q3 = self.__Q3(x, axis=axis)
        IQR_distance = np.multiply(d_IQR, bar)

        stat_shape = list(x.shape)

        if isinstance(axis, collections.Iterable):
            for single_axis in axis:
                stat_shape[single_axis] = 1
        else:
            stat_shape[axis] = 1

        if side in ["gt", "both"]:
            upper_range = d_Q3 + IQR_distance
            upper_outlier = np.greater(x - upper_range.reshape(stat_shape), 0)
        if side in ["lt", "both"]:
            lower_range = d_Q1 - IQR_distance
            lower_outlier = np.less(x - lower_range.reshape(stat_shape), 0)

        if side == "gt":
            self.outliers = upper_outlier
            return upper_outlier
        if side == "lt":
            self.outliers = lower_outlier
            return lower_outlier
        if side == "both":
            self.outliers = np.logical_or(upper_outlier, lower_outlier)
            return np.logical_or(upper_outlier, lower_outlier)

    def filter(self, x=None):
        if (x is None) and (self.x is not None):
            x = self.x
        elif (x is None) and (self.x is None):
            raise ValueError("Enter x input!")

        self.outliersIndices = np.where(self.outliers == True)
        print(f"Outliers are detected in {len(self.outliersIndices[0])} points.")
        self.x_filt = np.copy(x)
        self.x_filt[self.outliersIndices] = np.mean(x[~self.outliers])
        return self.x_filt, self.outliersIndices[0]

    def plot(self, plot_original=False):
        # Plot the signal and detected outliers
        plt.figure()

        if plot_original:
            # plt.plot(np.asarray(self.x), "ok", label="Orginal Signal")
            plt.plot(np.asarray(self.x), "-k", linewidth=7, label="Orginal Signal")

        for outlier in self.outliersIndices[0]:
            plt.axvline(outlier, color="red", linestyle="--", alpha=0.5, linewidth=4)

        if plot_original:
            plt.plot(filtered, "-", c="cyan", linewidth=1, label="Filtered Signal")
        else:
            plt.plot(filtered, "-", c="blue", linewidth=1, label="Filtered Signal")

        plt.legend()
        plt.tight_layout()
        plt.show()


# Test
# Finally, if you want to filter out the outliers, use a numpy selector.
x = [2, 3, 1, 4, 2, 3, 4, 5, 2, 3, 3, 3, 3, 4, 3, 2, 50, 60]  # data
outlier = Outlier(np.asarray(x))
outlier.IQR_outlier(axis=0, bar=1.5, side="both")
filtered, outlierIndices = outlier.filter()
outlier.plot(plot_original=True)


In [None]:
# detect and remove outliers from Hurst coefficients
outlier = Outlier(np.asarray(Hurst_H1))
outlier.IQR_outlier(axis=0, bar=1.5, side="both")
filtered, outlierIndices = outlier.filter()
outlier.plot(plot_original=True)
Hurst_H1 = filtered


In [None]:
# detect and remove outliers from Hurst coefficients
outlier = Outlier(np.asarray(Hurst_C1))
outlier.IQR_outlier(axis=0, bar=1.5, side="both")
filtered, outlierIndices = outlier.filter()
outlier.plot(plot_original=True)
Hurst_C1 = filtered


In [None]:
# detect and remove outliers from Hurst coefficients
outlier = Outlier(np.asarray(Hurst_C2))
outlier.IQR_outlier(axis=0, bar=1.5, side="both")
filtered, outlierIndices = outlier.filter()
outlier.plot(plot_original=True)
Hurst_C2 = filtered


In [None]:
# Add features to features dataframe
df_feat["Hurst_H1"] = Hurst_H1
df_feat["Hurst_H2"] = Hurst_H2
df_feat["Hurst_C1"] = Hurst_C1
df_feat["Hurst_C2"] = Hurst_C2


In [None]:
def embed_seq(time_series, tau, embedding_dimension):
    """Build a set of embedding sequences from given time series `time_series`
    with lag `tau` and embedding dimension `embedding_dimension`.
    Let time_series = [x(1), x(2), ... , x(N)], then for each i such that
    1 < i <  N - (embedding_dimension - 1) * tau,
    we build an embedding sequence,
    Y(i) = [x(i), x(i + tau), ... , x(i + (embedding_dimension - 1) * tau)].
    All embedding sequences are placed in a matrix Y.
    Parameters
    ----------
    time_series
        list or numpy.ndarray
        a time series
    tau
        integer
        the lag or delay when building embedding sequence
    embedding_dimension
        integer
        the embedding dimension
    Returns
    -------
    Y
        2-embedding_dimension list
        embedding matrix built
    Examples
    ---------------
    >>> import pyeeg
    >>> a=range(0,9)
    >>> pyeeg.embed_seq(a,1,4)
    array([[0,  1,  2,  3],
           [1,  2,  3,  4],
           [2,  3,  4,  5],
           [3,  4,  5,  6],
           [4,  5,  6,  7],
           [5,  6,  7,  8]])
    >>> pyeeg.embed_seq(a,2,3)
    array([[0,  2,  4],
           [1,  3,  5],
           [2,  4,  6],
           [3,  5,  7],
           [4,  6,  8]])
    >>> pyeeg.embed_seq(a,4,1)
    array([[0],
           [1],
           [2],
           [3],
           [4],
           [5],
           [6],
           [7],
           [8]])
    """
    if not type(time_series) == np.ndarray:
        typed_time_series = np.asarray(time_series)
    else:
        typed_time_series = time_series

    shape = (
        typed_time_series.size - tau * (embedding_dimension - 1),
        embedding_dimension,
    )

    strides = (typed_time_series.itemsize, tau * typed_time_series.itemsize)

    return np.lib.stride_tricks.as_strided(
        typed_time_series, shape=shape, strides=strides
    )


# Mean Distance (MD) and Central Tendency Measure (CTM)

In [None]:
def calc_mean_and_ctm(X, Y):
    # features = pd.DataFrame(columns=['radius','mean_distance','central_tendency_measure'])
    r = 0.5
    d = [math.sqrt(X[i] * X[i] + Y[i] * Y[i]) for i in range(0, len(X))]
    delta = [1 if i < r else 0 for i in d]
    d = [i for i in d if i < r]

    ctm = np.sum(delta[:-2]) / (len(delta) - 2)
    mean_distance = np.mean(d)

    # features.loc[0] = [r] + [ctm] + [mean_distance]
    return r, ctm, mean_distance


#### Example
y = np.random.randn(7680) * 10 + 100
# Second Order Difference Plot (SODP)
upper_quartile = np.percentile(y, 80)
lower_quartile = np.percentile(y, 20)
IQR = (upper_quartile - lower_quartile) * 1.5
quartileSet = (lower_quartile - IQR, upper_quartile + IQR)
y = y[np.where((y >= quartileSet[0]) & (y <= quartileSet[1]))]
# Plotting SODP
X = np.subtract(y[1:], y[0:-1])  # x(n+1)-x(n)
Y = np.subtract(y[2:], y[0:-2]).tolist()  # x(n+2)-x(n-1)
Y.extend([0])
# plt.figure(figsize=(8, 5))
# plt.plot(X, Y, ".")
# Calculate MD and CTM
_, mean_distance, central_tendency_measure = calc_mean_and_ctm(X, Y)


def mean_ctm_wrapper(x):
    """
    A wrapper function for calc_mean_and_ctm().
    This function calculates mean and central tendancy measure for a given time series `x`.

    Parameters
    ----------
    x : :py:class:`np.ndarray`
        Array of time series data. Must be a 1-D array of shape `(dataPoints,)`

    Returns
    -------
    Tuple of `mean_distance` and `central_tendency_measure`

    Example
    -------
        >>> y = np.random.randn(7680)*10 + 100
        >>> md, ctm = mean_ctm_wrapper(y)
        (0.054281767955801107, 0.33950566436214885)
    """
    upper_quartile = np.percentile(x, 80)
    lower_quartile = np.percentile(x, 20)
    IQR = (upper_quartile - lower_quartile) * 1.5
    quartileSet = (lower_quartile - IQR, upper_quartile + IQR)
    x = x[np.where((x >= quartileSet[0]) & (x <= quartileSet[1]))]
    # plotting SODP
    X = np.subtract(x[1:], x[0:-1])  # x(n+1)-x(n)
    Y = np.subtract(x[2:], x[0:-2]).tolist()  # x(n+2)-x(n-1)
    Y.extend([0])
    # calculate MD and CTM
    _, mean_distance, central_tendency_measure = calc_mean_and_ctm(X, Y)
    return mean_distance, central_tendency_measure


In [None]:
# Calculate feature for all epochs. Then add them to FeaturesDataFrame
mean_ctm = np.apply_along_axis(mean_ctm_wrapper, 1, arr=data_win)
df_feat["mean_distance"] = mean_ctm[:, 0]
df_feat["central_tendency_measure"] = mean_ctm[:, 1]


# Recurrence Plot

In [None]:
#!/usr/bin/python
# coding: UTF-8
from __future__ import division, print_function
import numpy as np
import pylab as plt
from scipy.spatial.distance import pdist, squareform


def rec_plot(s, eps=0.10, steps=10):
    d = pdist(s[:, None])
    d = np.floor(d / eps)
    d[d > steps] = steps
    Z = squareform(d)
    return Z


def moving_average(s, r=5):
    return np.convolve(s, np.ones((r,)) / r, mode="valid")


if __name__ == "__main__":
    # Generate singal
    N = 200
    eps = 0.1
    steps = 10

    # Plot normal dist filtered with moving average
    rn = np.random.normal(size=N)
    rn_filtered = moving_average(rn)

    plt.subplot(211)
    plt.plot(rn_filtered)
    plt.title("Normal")
    plt.subplot(212)
    plt.imshow(rec_plot(rn_filtered, eps=eps, steps=steps))

    plt.show()


# Tsallis and Renyi Entropy

In [None]:
from collections import Counter


class Counter(Counter):
    def prob(self):
        return np.array(list(self.values()))


def symbols_to_prob(symbols):
    """
    Return a dict mapping symbols to  probability.
    input:
    -----
        symbols:     iterable of hashable items
                     works well if symbols is a zip of iterables
    """
    myCounter = Counter(symbols)

    N = float(len(list(symbols)))  # symbols might be a zip object in python 3

    for k in myCounter:
        myCounter[k] /= N

    return myCounter


def entropy(data=None, prob=None, tol=1e-5):
    """
    given a probability distribution (prob) or an interable of symbols (data) compute and
    return its entropy
    inputs:
    ------
        data:       iterable of symbols
        prob:       iterable with probabilities
        tol:        if prob is given, 'entropy' checks that the sum is about 1.
                    It raises an error if abs(sum(prob)-1) >= tol
    """

    if prob is None and data is None:
        raise ValueError(
            "%s.entropy requires either 'prob' or 'data' to be defined" % __name__
        )

    if prob is not None and data is not None:
        raise ValueError(
            "%s.entropy requires only 'prob' or 'data to be given but not both"
            % __name__
        )

    if prob is not None and not isinstance(prob, np.ndarray):
        raise TypeError("'entropy' in '%s' needs 'prob' to be an ndarray" % __name__)

    if prob is not None and abs(prob.sum() - 1) > tol:
        raise ValueError("parameter 'prob' in '%s.entropy' should sum to 1" % __name__)

    if data is not None:
        prob = symbols_to_prob(data).prob()

    # compute the log2 of the probability and change any -inf by 0s
    logProb = np.log2(prob)
    logProb[logProb == -np.inf] = 0

    # return dot product of logProb and prob
    return -float(np.dot(prob, logProb))


def renyi(data=None, a=2):
    if data is not None:
        prob = symbols_to_prob(data).prob()

    # compute the log2 of the probability and change any -inf by 0s
    powerProb = prob ** int(a)
    logProb = np.log(powerProb)
    # return dot product of logProb and prob
    return -(a / (1 - a)) * (np.sum(logProb))


def tsallis(data=None, q=2):
    if data is not None:
        prob = symbols_to_prob(data).prob()

    # compute the log2 of the probability and change any -inf by 0s
    powerProb = prob ** int(q)
    # return dot product of logProb and prob
    return (1 / (q - 1)) * (1 - np.sum(powerProb))


In [None]:
# Calculate feature for all epochs. Then add them to FeaturesDataFrame
data_win_rnd3 = np.around(data_win, decimals=3)
tsallisEnt = np.apply_along_axis(tsallis, 1, arr=data_win_rnd3)
df_feat["tsallisEnt"] = tsallisEnt


In [None]:
# generate a figure compraing tsallis and shannon entropy
Ent = np.apply_along_axis(entropy, 1, arr=data_win_rnd3)

plt.figure(figsize=(8, 6))
plt.plot(
    moving_average(normalize(tsallisEnt - np.mean(tsallisEnt)), 2),
    label="tsallis entropy",
    color="black",
    linewidth=3,
)
plt.plot(
    moving_average(normalize(Ent - np.mean(Ent)), 2),
    label="shannon entropy",
    color="lightgreen",
    linewidth=0.8,
)
plt.xlabel("Epoch")
plt.ylabel("Entropy")
plt.title("Compraing Tsallis and Shannon Entropies")
plt.legend()
plt.tight_layout()
plt.savefig("tsallis_compare.svg", format="svg")
plt.savefig("tsallis_compare.png", format="png")
plt.show()


In [None]:
# Calculate feature for all epochs. Then add them to FeaturesDataFrame
data_win_rnd3 = np.around(data_win, decimals=3)
renyiEnt = np.apply_along_axis(renyi, 1, arr=data_win_rnd3)
df_feat["renyi"] = renyiEnt


In [None]:
# generate a figure compraing renyi and shannon entropy
Ent = np.apply_along_axis(entropy, 1, arr=data_win_rnd3)
renyiEnt = -renyiEnt

plt.figure(figsize=(8, 6))
plt.plot(
    moving_average(normalize(renyiEnt - np.mean(renyiEnt)), 2),
    label="renyi entropy",
    color="black",
    linewidth=3,
)
plt.plot(
    moving_average(normalize(Ent - np.mean(Ent)), 2),
    label="shannon entropy",
    color="cyan",
    linewidth=1,
)
plt.xlabel("Epoch")
plt.ylabel("Entropy")
plt.title("Compraing Renyi and Shannon Entropies")
plt.legend()
plt.tight_layout()
plt.savefig("renyi_compare.svg", format="svg")
plt.savefig("renyi_compare.png", format="png")
plt.show()


# Bubble Entropy

In [None]:
# Manis and Sassi, “A Python Library with Fast Algorithms for Popular Entropy Definitions.”

from numpy import histogram, log


def bubble_count(x):
    """
    counts the number of swaps when sorting
    :param x: the input vector
    :return: the total number of swaps
    """
    y = 0
    for i in range(len(x) - 1, 0, -1):
        for j in range(i):
            if x[j] > x[j + 1]:
                x[j], x[j + 1] = x[j + 1], x[j]
                y += 1
    return y


def complexity_count_fast(x, m):
    """
    :param x: the input series
    :param m: the dimension of the space
    :return: the series of complexities for total number of swaps
    """

    if len(x) < m:
        return []

    y = [bubble_count(x[:m])]
    v = sorted(x[:m])

    for i in range(m, len(x)):
        steps = y[i - m]
        steps -= v.index(x[i - m])
        v.pop(v.index(x[i - m]))
        v.append(x[i])
        j = m - 1
        while j > 0 and v[j] < v[j - 1]:
            v[j], v[j - 1] = v[j - 1], v[j]
            steps += 1
            j -= 1
        y.append(steps)

    return y


def renyi_int(data):
    """
    returns renyi entropy (order 2) of an integer series and bin_size=1
    (specified for the needs of bubble entropy)
    :param data: the input series
    :return: metric
    """
    counter = [0] * (max(data) + 1)
    for x in data:
        counter[x] += 1
    r = 0
    for c in counter:
        p = c / len(data)
        r += p * p
    return -log(r)


def bubble_entropy(x, m=10):
    """
    computes bubble entropy following the definition
    :param x: the input signal
    :param m: the dimension of the embedding space
    :return: metric
    """
    complexity = complexity_count_fast(x, m)
    B = renyi_int(complexity) / log(1 + m * (m - 1) / 2)

    complexity = complexity_count_fast(x, m + 1)
    A = renyi_int(complexity) / log(1 + (m + 1) * m / 2)

    return A - B


def bubble_entropy_2(x, m=10):
    """
    computes bubble entropy following the definition
    :param x: the input signal
    :param m: the dimension of the embedding space
    :return: metric
    """
    complexity = complexity_count_fast(x, m)
    B = renyi_int(complexity) / log(1 + m * (m - 1) / 2)

    complexity = complexity_count_fast(x, m + 2)
    A = renyi_int(complexity) / log(1 + (m + 2) * (m + 1) / 2)

    return A - B


In [None]:
# Calculate feature for all epochs. Then add them to FeaturesDataFrame
data_win_rnd3 = np.around(data_win, decimals=3)
bubbleEnt1 = np.apply_along_axis(bubble_entropy, 1, arr=data_win_rnd3)
df_feat["bubbleEnt1"] = bubbleEnt1

# Calculate feature for all epochs. Then add them to FeaturesDataFrame
data_win_rnd3 = np.around(data_win, decimals=3)
bubbleEnt2 = np.apply_along_axis(bubble_entropy_2, 1, arr=data_win_rnd3)
df_feat["bubbleEnt2"] = bubbleEnt2



In [None]:
fig = plt.figure(figsize=(10, 5))
plt.plot(
    moving_average(normalize(bubbleEnt1 - np.mean(bubbleEnt1)), 2),
    label="Bubble entropy 1",
    color="dodgerblue",
    linewidth=1.5,
)
fig.suptitle("Compraing Bubble Entropy 1 and 2 (Normalized)")
plt.plot(
    moving_average(normalize(bubbleEnt2 - np.mean(bubbleEnt2)), 2),
    label="Bubble entropy 2",
    color="darkorchid",
    linewidth=1.5,
)
plt.xlabel("Epoch")
plt.ylabel("Entropy")
plt.legend()
plt.tight_layout()
plt.savefig("bubble_entropy.svg", format="svg")
plt.savefig("bubble_entropy.png", format="png")
plt.show()

# Differential Entropy

In [None]:
from scipy.stats import differential_entropy

# Calculate feature for all epochs. Then add them to FeaturesDataFrame
data_win_rnd3 = np.around(data_win, decimals=3)
diffEnt = np.apply_along_axis(differential_entropy, 1, arr=data_win_rnd3)
diffEntMean = np.mean(diffEnt[~(diffEnt == -np.inf)])
diffEnt[diffEnt == -np.inf] = diffEntMean
df_feat["diffEnt"] = diffEnt

In [None]:
fig = plt.figure(figsize=(10, 5))
plt.plot(
    moving_average(normalize(diffEnt - np.mean(diffEnt)), 2),
    label="Differential Entropy",
    color="darkgreen",
    linewidth=1.5,
)
plt.title("Differential Entropy (Normalized)")
plt.xlabel("Epoch")
plt.ylabel("Entropy")
plt.legend()
plt.tight_layout()
plt.savefig("differential_entropy.svg", format="svg")
plt.savefig("differential_entropy.png", format="png")
plt.show()

# Fuzzy Entropy

In [None]:
# This cell took ~11 min to execute

# pip install EntropyHub
import EntropyHub as enth


def fuzzEnt_f(x, m=1, tau=1):
    """
    A wrapper function for EntropyHub.FuzzEn() Function

    Input
    ------
    `sig`: Time series signal, a vector of length > 10.
    `m`: Embedding dimension, a positive integer (for embbeding dim).
    `tau`: Time delay, a positive integer (for embbeding dim).
    `Fx`: Type of fuzzy function for distance transformation, one of the following strings
    Return
    ------
    `Fuzz`: Fuzzy entropy estimates for each embedding dimension 1:m.
    `Ps1`: The average fuzzy distances for embedding dimensions 1:m.
    `Ps2`: The average fuzzy distances for embedding dimensions 2:m+1.
    Example
    -------
    >>> [Fuzz, Ps1, Ps2] = enth.FuzzEn(x, m=1, tau=1)
    Source
    ------
    https://github.com/MattWillFlood/EntropyHub/blob/main/Guide/EntropyHub%20Guide.pdf
    """

    [Fuzz, Ps1, Ps2] = enth.FuzzEn(x, m=m, tau=tau)
    return Fuzz[0]


# Calculate feature for all epochs. Then add them to FeaturesDataFrame
data_win_rnd3 = np.around(data_win, decimals=3)
fuzzEnt = np.apply_along_axis(fuzzEnt_f, 1, arr=data_win_rnd3)
df_feat["fuzzEnt"] = fuzzEnt


In [None]:
fig = plt.figure(figsize=(10, 5))
plt.plot(
    moving_average(normalize(fuzzEnt - np.mean(fuzzEnt)), 2),
    label="Fuzzy Entropy",
    color="mediumblue",
    linewidth=1.5,
)
plt.title("Fuzzy Entropy (Normalized)")
plt.xlabel("Epoch")
plt.ylabel("Entropy")
plt.legend()
plt.tight_layout()
plt.savefig("fuzzy_entropy.svg", format="svg")
plt.savefig("fuzzy_entropy.png", format="png")
plt.show()


In [None]:
# Write feature object to a comma-separated values (csv) file
df_feat.to_csv(f"feature/{fname} {lr}.csv", index=False)


In [None]:
df_feat.shape

# Feature Selection

In [None]:
from sklearn.feature_selection import f_classif

# Load hypnogram
location_hypno = "/Users/amirhosseindaraie/Desktop/data/synced-hypnos"
hypno_30s = np.loadtxt(f"{location_hypno}/p8n3_synced.txt")[:, 0]

# Extract sorted F-values
# Compute the ANOVA F-value for the provided sample.
fvals = pd.Series(
    f_classif(X=df_feat, y=hypno_30s)[0], index=df_feat.columns
).sort_values()

# Plot features ranking
plt.figure(figsize=(6, 6))
sns.barplot(y=fvals.index, x=fvals, palette="RdYlGn")
plt.xlabel("F-values")
plt.xticks(rotation=20)
plt.tight_layout()
plt.show()


In [None]:
# Plot hypnogram and a feature
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), sharex=True)

hypno = pd.Series(hypno_30s).map({-1: -1, 0: 0, 1: 2, 2: 3, 3: 4, 4: 1}).values
hypno_rem = np.ma.masked_not_equal(hypno, 1)

# Plot the hypnogram
ax1.step(times, -1 * hypno, color="k", lw=1.5)
ax1.step(times, -1 * hypno_rem, color="r", lw=2.5)
ax1.set_yticks([0, -1, -2, -3, -4])
ax1.set_yticklabels(["W", "R", "N1", "N2", "N3"])
ax1.set_ylim(-4.5, 0.5)
ax1.set_ylabel("Sleep stage")

# Plot the non-linear feature
ax2.plot(times, df_feat["perm_entropy"])
ax2.set_ylabel("Permutation Entropy")
# ax2.set_ylabel('Higuchi Fractal Dimension')
ax2.set_xlabel("Time [minutes]")

ax2.set_xlim(0, times[-1])

plt.tight_layout()
plt.show()


# Feature Filtering

In [None]:
def constant_feature_detect(data, threshold=0.98):
    """detect features that show the same value for the
    majority/all of the observations (constant/quasi-constant features)

    Parameters
    ----------
    data : pd.Dataframe
    threshold : threshold to identify the variable as constant

    Returns
    -------
    list of variables names
    """

    data_copy = data.copy(deep=True)
    quasi_constant_feature = []
    for feature in data_copy.columns:
        predominant = (
            (data_copy[feature].value_counts() / np.float(len(data_copy)))
            .sort_values(ascending=False)
            .values[0]
        )
        if predominant >= threshold:
            quasi_constant_feature.append(feature)
    print(len(quasi_constant_feature), " variables are found to be almost constant")
    return quasi_constant_feature


def corr_feature_detect(data, threshold=0.8):
    """detect highly-correlated features of a Dataframe
    Parameters
    ----------
    data : pd.Dataframe
    threshold : threshold to identify the variable correlated

    Returns
    -------
    pairs of correlated variables
    """

    corrmat = data.corr()
    corrmat = corrmat.abs().unstack()  # absolute value of corr coef
    corrmat = corrmat.sort_values(ascending=False)
    corrmat = corrmat[corrmat >= threshold]
    corrmat = corrmat[corrmat < 1]  # remove the digonal
    corrmat = pd.DataFrame(corrmat).reset_index()
    corrmat.columns = ["feature1", "feature2", "corr"]

    grouped_feature_ls = []
    correlated_groups = []

    for feature in corrmat.feature1.unique():
        if feature not in grouped_feature_ls:

            # find all features correlated to a single feature
            correlated_block = corrmat[corrmat.feature1 == feature]
            grouped_feature_ls = (
                grouped_feature_ls
                + list(correlated_block.feature2.unique())
                + [feature]
            )

            # append the block of features to the list
            correlated_groups.append(correlated_block)
    return correlated_groups


# Code by Eamon.Zhang


## Correlation method
Remove features that are highly correlated with each other. We can decide which ones to remove.

In [None]:
corr = corr_feature_detect(data=df_feat, threshold=0.9)
# print all the correlated feature groups!
for i in corr:
    print(i, "\n")



# Feature scaling 
Feature scaling is a method used to standardize the range of independent variables or features of data. In data processing, it is also known as data normalization and is generally performed during the data preprocessing step.

Why Feature Scaling Matters?

If range of inputs varies, in some algorithms, object functions will not work properly.

`Gradient descent` converges much faster with feature scaling done. Gradient descent is a common optimization algorithm used in `logistic regression`, `SVMs`, `neural networks` etc.

Algorithms that involve distance calculation like `KNN`, `Clustering` are also affected by the magnitude of the feature. Just consider how Euclidean distance is calculated: taking the square root of the sum of the squared differences between observations. This distance can be greatly affected by differences in scale among the variables. Variables with large variances have a larger effect on this measure than variables with small variances.

Note: `Tree-based algorithms` are almost the only algorithms that are not affected by the magnitude of the input, as we can easily see from how trees are built. When deciding how to make a split, tree algorithm look for decisions like "whether feature value X>3.0" and compute the purity of the child node after the split, so the scale of the feature does not count.



## Normalization - Standardization (Z-score scaling)	
Removes the mean and scales the data to unit variance.
$$z = (X - X.mean) / std$$
- `pros (+)`: feature is rescaled to have a standard normal distribution that centered around 0 with SD of 1	
- `cons (-)`: compress the observations in the narrow range if the variable is skewed or has outliers, thus impair the predictive power.

🚨 If your feature is not Gaussian like, say, has a skewed distribution or has outliers, Normalization - Standardization is not a good choice as it will compress most data to a narrow range. However, we can transform the feature into Gaussian like and then use Normalization - Standardization.

💡 When performing distance or covariance calculation (algorithm like Clustering, PCA and LDA), it is better to use Normalization - Standardization as it will remove the effect of scales on variance and covariance


In [None]:
def standardization_f(df):
    """
    Example
    -------
    >>> df = pd.DataFrame( {"col1": [1, 3, 5, 7, 9], "col2": [7, 4, 35, 14, 56]} )
    >>> df = standardization_f(df)
    """

    from sklearn.preprocessing import StandardScaler

    # define standard scaler
    scaler = StandardScaler()

    # transform data
    df_scaled = scaler.fit_transform(df)

    return pd.DataFrame(df_scaled, columns=df.columns)


## Min-Max scaling	
Transforms features by scaling each feature to a given range. Default is `feature_range` to [0,1].
$$X_{scaled} = (X - X.min) / (X.max - X.min)$$
- `cons (-)`: Compress the observations in the narrow range if the variable is skewed or has outliers, thus impair the predictive power.

🚨 Min-Max scaling has the same drawbacks as Normalization - Standardization, and also new data may not be bounded to [0,1] as they can be out of the original range. Some algorithms, for example some deep learning network prefer input on a 0-1 scale so this is a good choice.



In [None]:
def minMaxScaler_f(df, feature_range=(0, 1)):
    """
    Example
    -------
    >>> df = pd.DataFrame( {"col1": [1, 3, 5, 7, 9], "col2": [7, 4, 35, 14, 56]} )
    >>> df = minMaxScaler_f(df)
    """
    from sklearn.preprocessing import MinMaxScaler

    scaler = MinMaxScaler(feature_range)

    df_scaled = scaler.fit_transform(df.to_numpy())
    df_scaled = pd.DataFrame(df_scaled, columns=df.columns)

    return df_scaled

## Robust scaling	
Removes the median and scales the data according to the quantile range (defaults to `IQR`)
$$X_{scaled} = (X - X.median) / IQR$$
- `cons (+)`: Better at preserving the spread of the variable after transformation for skewed variables.


In [None]:
def robustScaler_f(df, feature_range=(0, 1)):
    """
    Example
    -------
    >>> df = pd.DataFrame( {"col1": [1, 3, 5, 7, 9], "col2": [7, 4, 35, 14, 56]} )
    >>> df = robustScaler_f(df)
    """
    from sklearn.preprocessing import RobustScaler

    scaler = RobustScaler()

    df_scaled = scaler.fit_transform(df.to_numpy())
    df_scaled = pd.DataFrame(df_scaled, columns=df.columns)

    return df_scaled


# Feature Selection  

In [None]:
import numpy as np
from sklearn.neighbors import KernelDensity
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.feature_selection import RFE
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import chi2
from sklearn.feature_selection import f_classif
from sklearn.feature_selection import mutual_info_classif
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from scipy.stats import kruskal
from matplotlib.pyplot import figure
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
def feature_selection_f_classif(x, y):
    """
    Compute the ANOVA F-value for the provided sample.
    Example
    -------
    >>> fvals = feature_selection_f_classif(df_feat, hypno_30s)
    """
    k_best = SelectKBest(f_classif, k="all")
    fit = k_best.fit(x, y)
    # print("Scores: ", fit.scores_)
    # ranking = fit.get_support()
    # print("Ranking: ", ranking)
    # features = fit.transform(x)
    fvals = pd.Series(fit.scores_, index=df_feat.columns).sort_values()
    return fvals


# Compute the ANOVA F-value for the provided sample
fvals = feature_selection_f_classif(df_feat, hypno_30s)
# Plot features ranking
plt.figure(figsize=(10, 10))
plt.suptitle('ANOVA F-value for features')
sns.barplot(y=fvals.index, x=fvals, palette="RdYlGn")
plt.xlabel("ANOVA F-value")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_fclassif.png", format="png")
plt.savefig("fs_fclassif.svg", format="svg")
plt.show()


In [None]:
def feature_selection_chi_squared(x, y):
    # Chi Squared
    # Feature extraction
    test = SelectKBest(score_func=chi2, k=10)
    fit = test.fit(x, y)
    # Summarize scores
    # np.set_printoptions(precision=3)
    # print(fit.scores_)
    # ranking = fit.get_support()
    # print("Ranking: ", ranking)
    # Summarize selected features
    # features = fit.transform(x)
    fvals = pd.Series(fit.scores_, index=df_feat.columns).sort_values()
    return fvals


# Compute the Chi-squared F-value for the provided sample
fvals = feature_selection_chi_squared(minMaxScaler_f(df_feat), hypno_30s)
# Plot features ranking
plt.figure(figsize=(10, 10))
plt.suptitle('Chi-squared stats of non-negative features')
sns.barplot(y=fvals.index, x=fvals, palette="RdYlGn")
plt.xlabel("F-values")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_chi_squared.png", format="png")
plt.savefig("fs_chi_squared.svg", format="svg")
plt.show()


In [None]:
def feature_selection_kruskal_wallis(x, y):
    """
    The Kruskal-Wallis H-test tests the null hypothesis that the population median of all of the groups are equal. It is a non-parametric version of ANOVA. The test works on 2 or more independent samples, which may have different sizes. Note that rejecting the null hypothesis does not indicate which of the groups differs. Post hoc comparisons between groups are required to determine which groups are different.
    """
    rank = []
    for i in df_feat.columns:
        setX = df_feat[i].values
        rank.append(kruskal(setX, hypno_30s)[0])

    ranks = pd.Series(rank, index=df_feat.columns).sort_values()
    return ranks


# Compute the Kruskal-Wallis H statistic for the provided sample
hvals = feature_selection_kruskal_wallis(df_feat, hypno_30s)
# Plot features ranking
plt.figure(figsize=(10, 10))
plt.suptitle("Kruskal-Wallis statistics for features")
sns.barplot(y=hvals.index, x=hvals, palette="RdYlGn")
plt.xlabel("Kruskal-Wallis H statistic")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_kruskal_wallis.png", format="png")
plt.savefig("fs_kruskal_wallis.svg", format="svg")
plt.show()


In [None]:
def feature_selection_mutual_info(x, y, k=10):
    """
    Preforms feature selection using the select_k_best with mutual_info_classif.
    Mutual information measures how much information the presence/absence of a feature contributes to making the correct prediction on Y.
    Recieves
    --------
        x -> feature dataframe
        y -> labels
        k -> number of remaining features
    Returns:
        keep_features -> list with the naime of the choosen features
    """
    clf = SelectKBest(mutual_info_classif, k=k)
    fit = clf.fit(x, y)
    fvals = pd.Series(fit.scores_, index=df_feat.columns).sort_values()
    return fvals


# Compute the estimated mutual information between each feature and the target for the provided sample
MI = feature_selection_mutual_info(df_feat, hypno_30s, 10)
# Plot features ranking
plt.figure(figsize=(10, 10))
plt.suptitle("Mutual Information between each feature and sleep stage")
sns.barplot(y=MI.index, x=MI, palette="RdYlGn")
plt.xlabel("Estimated mutual information")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_mutual_info.png", format="png")
plt.savefig("fs_mutual_info.svg", format="svg")
plt.show()


In [None]:
def feature_selection_ROC(x, y):
    """
    Preforms feature selection using the ROC with LDA classifier
    Recieves:
        data -> data frame
    Returns:
        keep_features -> list with the naime of the choosen features
    """

    rank = []
    classifier = LinearDiscriminantAnalysis()
    X_train, X_test, y_train, y_test = train_test_split(
        x, y, test_size=0.25, random_state=None, shuffle=False
    )

    for i in X_train.columns:
        x_train_2d_array = X_train[i].to_frame()
        y_score = classifier.fit(x_train_2d_array, y_train)
        x_test_2d_array = X_test[i].to_frame()
        y_score = y_score.decision_function(x_test_2d_array)
        # Compute Area Under the Receiver Operating Characteristic Curve (ROC AUC) from prediction scores
        rank.append([i, roc_auc_score(y_test, y_score)])

    rank = sorted(rank, key=lambda x: x[1])
    rank = list(
        zip(*rank)
    )  # python trick: transform list to select 1st and 2nd columns of it with indexing
    fvals = pd.Series(rank[1], index=rank[0]).sort_values()
    return fvals


# Binazrize multiclass hypnogram to two (REM/NREM or 1/0) for ROC with LDA classifier to work
hypno_30s_bin = hypno_30s.copy()
hypno_30s_bin[~(hypno_30s == 4)] = 0
hypno_30s_bin[(hypno_30s == 4)] = 1
# Compute the ranks for the provided sample
aucVal = feature_selection_ROC(df_feat, hypno_30s_bin)
# Plot features ranking
plt.figure(figsize=(10, 10))
plt.suptitle("Feature selection using the ROC with LDA classifier")
sns.barplot(y=fvals.index, x=fvals, palette="RdYlGn")
plt.xlabel("Area Under the Receiver Operating Characteristic Curve")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_auc_lda.png", format="png")
plt.savefig("fs_auc_lda.svg", format="svg")
plt.show()


In [None]:
from sklearn.preprocessing import MinMaxScaler


def feature_selection_kernel_density(x, y):
    """
    Preforms feature selection using the KernelDensity with gaussian kernel
    Recieves:
        data -> data frame
        n_features -> number of remaining features
    Returns:
        keep_features -> list with the naime of the choosen features
    """
    rank = []
    for column in x:
        df = x[column].values[..., np.newaxis]
        kde = KernelDensity(kernel="gaussian", bandwidth=0.2).fit(df)
        values = kde.score(df, y)  # Compute the total log-likelihood under the model.
        rank.append((column, values))

    rank = sorted(rank, key=lambda x: x[1], reverse=True)
    rank = list(
        zip(*rank)
    )  # python trick: transform list to select 1st and 2nd columns of it with indexing
    fvals = pd.Series(rank[1], index=rank[0]).sort_values()
    return fvals


# total log-likelihood under the mode
likeliUmodel = feature_selection_kernel_density(df_feat, hypno_30s)

scaler = MinMaxScaler()
likeliUmodel_scaled = scaler.fit_transform(likeliUmodel.values.reshape(-1, 1))
likeliUmodel_scaled = pd.Series(
    likeliUmodel_scaled.squeeze(), index=likeliUmodel.keys()
)


# Plot features ranking - unscaled
plt.figure(figsize=(10, 10))
plt.suptitle("Feature selection using the Kernel Density Estimation with gaussian kernel")
sns.barplot(y=likeliUmodel.index, x=fvals, palette="RdYlGn")
plt.xlabel("Total log-likelihood under the model")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_KernelDensity_unscaled.png", format="png")
plt.savefig("fs_KernelDensity_unscaled.svg", format="svg")
plt.show()

# Plot features ranking - scaled
plt.figure(figsize=(10, 10))
plt.suptitle("Feature selection using the Kernel Density Estimation with gaussian kernel")
sns.barplot(y=likeliUmodel_scaled.index, x=likeliUmodel_scaled, palette="RdYlGn")
plt.xlabel("Total log-likelihood under the model ( scaled to [0,1] )")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_KernelDensity_scaled.png", format="png")
plt.savefig("fs_KernelDensity_scaled.svg", format="svg")
plt.show()


In [None]:
def feature_selection_RFE(x, y):
    """
    Preforms feature selection using the RFE with LDA.
    Feature ranking with recursive feature elimination.
    Given an external estimator that assigns weights to features (e.g., the coefficients of a linear model), the goal of recursive feature elimination (RFE) is to select features by recursively considering smaller and smaller sets of features. First, the estimator is trained on the initial set of features and the importance of each feature is obtained either through any specific attribute or callable. Then, the least important features are pruned from current set of features. That procedure is recursively repeated on the pruned set until the desired number of features to select is eventually reached.
    """
    estimator = LinearDiscriminantAnalysis()
    selector = RFE(estimator, n_features_to_select=1)
    selector = selector.fit(x, y)
    ranks = pd.Series(selector.ranking_, index=df_feat.columns).sort_values()
    # Sort ascending
    ranks = ranks.max() - ranks + 1
    ranks = ranks.sort_values()
    return ranks


ranks = feature_selection_RFE(df_feat, hypno_30s)
# Plot features ranking - scaled
plt.figure(figsize=(10, 10))
plt.suptitle("Feature selection using the RFE with LDA")
sns.barplot(y=ranks.index, x=ranks, palette="RdYlGn")
plt.xlabel("Rank")
plt.xticks(rotation=20)
plt.yticks(size=8)
plt.tight_layout()
plt.savefig("fs_RFE_LDA.png", format="png")
plt.savefig("fs_RFE_LDA.svg", format="svg")
plt.show()


In [None]:
def pearson_correlation_print(df):
    # Using Pearson Correlation

    # plt.figure(figsize=(12, 10))
    corr_matrix = df.corr()
    # sns.heatmap(corr_matrix, annot=True, cmap=plt.cm.Reds)
    # plt.show()

    outputVariables = ["hmob"]
    for outputVariable in outputVariables:
        # Correlation with output variable
        cor_target = abs(corr_matrix[outputVariable])
        # Selecting highly correlated features
        relevant_features = cor_target[cor_target >= 0.8]
        print(f"RELEVANT ONES to {outputVariable}:")
        print(relevant_features)
        print()


def pearson_correlation_plot(df, thresh=0.95):
    plt.figure(figsize=(12, 10))
    plt.suptitle(f'Pearson Correlation Plot for Features (thresh = {thresh})')
    corr_matrix = df.corr()
    corr_matrix[abs(corr_matrix) >= thresh] = abs(corr_matrix)
    corr_matrix[abs(corr_matrix) < thresh] = 0
    # print(corr_matrix.shape)
    g = sns.heatmap(abs(corr_matrix), cmap=plt.cm.Reds, xticklabels=True, yticklabels=True)
    g.set_xticklabels(g.get_xmajorticklabels(), fontsize=7)
    g.set_yticklabels(g.get_ymajorticklabels(), fontsize=7)
    plt.tight_layout()
    plt.savefig(f"fs_pearson_correlation_thresh_{thresh}.png", format="png")
    plt.savefig(f"fs_pearson_correlation_thresh_{thresh}.svg", format="svg")
    plt.show()


pearson_correlation_print(df_feat)


In [None]:
# Plot hypnogram and a feature
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), sharex=True)

hypno = pd.Series(hypno_30s).map({-1: -1, 0: 0, 1: 2, 2: 3, 3: 4, 4: 1}).values
hypno_rem = np.ma.masked_not_equal(hypno, 1)

# Plot the hypnogram
ax1.step(times, -1 * hypno, color="k", lw=1.5)
ax1.step(times, -1 * hypno_rem, color="r", lw=2.5)
ax1.set_yticks([0, -1, -2, -3, -4])
ax1.set_yticklabels(["W", "R", "N1", "N2", "N3"])
ax1.set_ylim(-4.5, 0.5)
ax1.set_ylabel("Sleep stage")

# Plot the non-linear feature
ax2.plot(times, df_feat["bubbleEnt1"])
ax2.set_ylabel("Bubble Entropy")
# ax2.set_ylabel('Higuchi Fractal Dimension')
ax2.set_xlabel("Time [minutes]")

ax2.set_xlim(0, times[-1])

plt.tight_layout()
plt.show()


In [None]:
plt.hist(hypno_30s)