In [1]:
import glob
import os
import random
import numpy as np
import pandas as pd
from scipy.signal import resample
from scipy.interpolate import splev, splrep
from __future__ import absolute_import, division, print_function
from six.moves import map, range, zip
import six
import collections
import copy
import keyword
import re
import scipy.signal as ss
from scipy import interpolate, optimize
from scipy.stats import stats
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec

In [2]:
CHAT_SIGS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
chat_s_count = len(CHAT_SIGS)

CHAT_THRESHOLD = 3
CHAT_PREPROCESSED_PATH = 'D:\\data_chat_baseline_30x128_test'
CHAT_FREQ = 128
CHAT_EPOCH_LENGTH = 30
CHAT_ECG_SIG = 8
CHAT_OUT_PATH = "D:\\"

In [None]:
# -*- coding: utf-8 -*-
"""
biosppy.utils
-------------

This module provides several frequently used functions and hacks.

:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""

class Utils:
    @staticmethod
    def normpath(path):
        """Normalize a path.

        Parameters
        ----------
        path : str
            The path to normalize.

        Returns
        -------
        npath : str
            The normalized path.

        """

        if '~' in path:
            out = os.path.abspath(os.path.expanduser(path))
        else:
            out = os.path.abspath(path)

        return out

    @staticmethod
    def fileparts(path):
        """split a file path into its directory, name, and extension.

        Parameters
        ----------
        path : str
            Input file path.

        Returns
        -------
        dirname : str
            File directory.
        fname : str
            File name.
        ext : str
            File extension.

        Notes
        -----
        * Removes the dot ('.') from the extension.

        """

        dirname, fname = os.path.split(path)
        fname, ext = os.path.splitext(fname)
        ext = ext.replace('.', '')

        return dirname, fname, ext

    @staticmethod
    def fullfile(*args):
        """Join one or more file path components, assuming the last is
        the extension.

        Parameters
        ----------
        ``*args`` : list, optional
            Components to concatenate.

        Returns
        -------
        fpath : str
            The concatenated file path.

        """

        nb = len(args)
        if nb == 0:
            return ''
        elif nb == 1:
            return args[0]
        elif nb == 2:
            return os.path.join(*args)

        fpath = os.path.join(*args[:-1]) + '.' + args[-1]

        return fpath

    @staticmethod
    def walktree(top=None, spec=None):
        """Iterator to recursively descend a directory and return all files
        matching the spec.

        Parameters
        ----------
        top : str, optional
            Starting directory; if None, defaults to the current working directoty.
        spec : str, optional
            Regular expression to match the desired files;
            if None, matches all files; typical patterns:
                * `r'\.txt$'` - matches files with '.txt' extension;
                * `r'^File_'` - matches files starting with 'File\_'
                * `r'^File_.+\.txt$'` - matches files starting with 'File\_' and ending with the '.txt' extension.

        Yields
        ------
        fpath : str
            Absolute file path.

        Notes
        -----
        * Partial matches are also selected.

        See Also
        --------
        * https://docs.python.org/3/library/re.html
        * https://regex101.com/

        """

        if top is None:
            top = os.getcwd()

        if spec is None:
            spec = r'.*?'

        prog = re.compile(spec)

        for root, _, files in os.walk(top):
            for name in files:
                if prog.search(name):
                    fname = os.path.join(root, name)
                    yield fname

    @staticmethod
    def remainderAllocator(votes, k, reverse=True, check=False):
        """Allocate k seats proportionally using the Remainder Method.

        Also known as Hare-Niemeyer Method. Uses the Hare quota.

        Parameters
        ----------
        votes : list
            Number of votes for each class/party/cardinal.
        k : int
            Total number o seats to allocate.
        reverse : bool, optional
            If True, allocates remaining seats largest quota first.
        check : bool, optional
            If True, limits the number of seats to the total number of votes.

        Returns
        -------
        seats : list
            Number of seats for each class/party/cardinal.

        """

        # check total number of votes
        tot = np.sum(votes)
        if check and k > tot:
            k = tot

        # frequencies
        length = len(votes)
        freqs = np.array(votes, dtype='float') / tot

        # assign items
        aux = k * freqs
        seats = aux.astype('int')

        # leftovers
        nb = k - seats.sum()
        if nb > 0:
            if reverse:
                ind = np.argsort(aux - seats)[::-1]
            else:
                ind = np.argsort(aux - seats)

            for i in range(nb):
                seats[ind[i % length]] += 1

        return seats.tolist()

    @staticmethod
    def highestAveragesAllocator(votes, k, divisor='dHondt', check=False):
        """Allocate k seats proportionally using the Highest Averages Method.

        Parameters
        ----------
        votes : list
            Number of votes for each class/party/cardinal.
        k : int
            Total number o seats to allocate.
        divisor : str, optional
            Divisor method; one of 'dHondt', 'Huntington-Hill', 'Sainte-Lague',
            'Imperiali', or 'Danish'.
        check : bool, optional
            If True, limits the number of seats to the total number of votes.

        Returns
        -------
        seats : list
            Number of seats for each class/party/cardinal.

        """

        # check total number of cardinals
        tot = np.sum(votes)
        if check and k > tot:
            k = tot

        # select divisor
        if divisor == 'dHondt':
            fcn = lambda i: float(i)
        elif divisor == 'Huntington-Hill':
            fcn = lambda i: np.sqrt(i * (i + 1.))
        elif divisor == 'Sainte-Lague':
            fcn = lambda i: i - 0.5
        elif divisor == 'Imperiali':
            fcn = lambda i: float(i + 1)
        elif divisor == 'Danish':
            fcn = lambda i: 3. * (i - 1.) + 1.
        else:
            raise ValueError("Unknown divisor method.")

        # compute coefficients
        tab = []
        length = len(votes)
        D = [fcn(i) for i in range(1, k + 1)]
        for i in range(length):
            for j in range(k):
                tab.append((i, votes[i] / D[j]))

        # sort
        tab.sort(key=lambda item: item[1], reverse=True)
        tab = tab[:k]
        tab = np.array([item[0] for item in tab], dtype='int')

        seats = np.zeros(length, dtype='int')
        for i in range(length):
            seats[i] = np.sum(tab == i)

        return seats.tolist()

    @staticmethod
    def random_fraction(indx, fraction, sort=True):
        """Select a random fraction of an input list of elements.

        Parameters
        ----------
        indx : list, array
            Elements to partition.
        fraction : int, float
            Fraction to select.
        sort : bool, optional
            If True, output lists will be sorted.

        Returns
        -------
        use : list, array
            Selected elements.
        unuse : list, array
            Remaining elements.

        """

        # number of elements to use
        fraction = float(fraction)
        nb = int(fraction * len(indx))

        # copy because shuffle works in place
        aux = copy.deepcopy(indx)

        # shuffle
        np.random.shuffle(indx)

        # select
        use = aux[:nb]
        unuse = aux[nb:]

        # sort
        if sort:
            use.sort()
            unuse.sort()

        return use, unuse


    class ReturnTuple(tuple):
        """A named tuple to use as a hybrid tuple-dict return object.

        Parameters
        ----------
        values : iterable
            Return values.
        names : iterable, optional
            Names for return values.

        Raises
        ------
        ValueError
            If the number of values differs from the number of names.
        ValueError
            If any of the items in names:
            * contain non-alphanumeric characters;
            * are Python keywords;
            * start with a number;
            * are duplicates.

        """

        def __new__(cls, values, names=None):

            return tuple.__new__(cls, tuple(values))

        def __init__(self, values, names=None):

            nargs = len(values)

            if names is None:
                # create names
                names = ['_%d' % i for i in range(nargs)]
            else:
                # check length
                if len(names) != nargs:
                    raise ValueError("Number of names and values mismatch.")

                # convert to str
                names = list(map(str, names))

                # check for keywords, alphanumeric, digits, repeats
                seen = set()
                for name in names:
                    if not all(c.isalnum() or (c == '_') for c in name):
                        raise ValueError("Names can only contain alphanumeric \
                                          characters and underscores: %r." % name)

                    if keyword.iskeyword(name):
                        raise ValueError("Names cannot be a keyword: %r." % name)

                    if name[0].isdigit():
                        raise ValueError("Names cannot start with a number: %r." %
                                         name)

                    if name in seen:
                        raise ValueError("Encountered duplicate name: %r." % name)

                    seen.add(name)

            self._names = names

        def as_dict(self):
            """Convert to an ordered dictionary.

            Returns
            -------
            out : OrderedDict
                An OrderedDict representing the return values.

            """

            return collections.OrderedDict(zip(self._names, self))

        __dict__ = property(as_dict)

        def __getitem__(self, key):
            """Get item as an index or keyword.

            Returns
            -------
            out : object
                The object corresponding to the key, if it exists.

            Raises
            ------
            KeyError
                If the key is a string and it does not exist in the mapping.
            IndexError
                If the key is an int and it is out of range.

            """

            if isinstance(key, six.string_types):
                if key not in self._names:
                    raise KeyError("Unknown key: %r." % key)

                key = self._names.index(key)

            return super(ReturnTuple, self).__getitem__(key)

        def __repr__(self):
            """Return representation string."""

            tpl = '%s=%r'

            rp = ', '.join(tpl % item for item in zip(self._names, self))

            return 'ReturnTuple(%s)' % rp

        def __getnewargs__(self):
            """Return self as a plain tuple; used for copy and pickle."""

            return tuple(self)

        def keys(self):
            """Return the value names.

            Returns
            -------
            out : list
                The keys in the mapping.

            """

            return list(self._names)

utils = Utils

In [None]:
# -*- coding: utf-8 -*-
"""
biosppy.signals.tools
---------------------

This module provides various signal analysis methods in the time and
frequency domains.

:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""
class Tools:
    @staticmethod
    def _norm_freq(frequency=None, sampling_rate=1000.):
        """Normalize frequency to Nyquist Frequency (Fs/2).

        Parameters
        ----------
        frequency : int, float, list, array
            Frequencies to normalize.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).

        Returns
        -------
        wn : float, array
            Normalized frequencies.

        """

        # check inputs
        if frequency is None:
            raise TypeError("Please specify a frequency to normalize.")

        # convert inputs to correct representation
        try:
            frequency = float(frequency)
        except TypeError:
            # maybe frequency is a list or array
            frequency = np.array(frequency, dtype='float')

        Fs = float(sampling_rate)

        wn = 2. * frequency / Fs

        return wn

    @staticmethod
    def _filter_init(b, a, alpha=1.):
        """Get an initial filter state that corresponds to the steady-state
        of the step response.

        Parameters
        ----------
        b : array
            Numerator coefficients.
        a : array
            Denominator coefficients.
        alpha : float, optional
            Scaling factor.

        Returns
        -------
        zi : array
            Initial filter state.

        """

        zi = alpha * ss.lfilter_zi(b, a)

        return zi

    @staticmethod
    def _filter_signal(b, a, signal, zi=None, check_phase=True, **kwargs):
        """Filter a signal with given coefficients.

        Parameters
        ----------
        b : array
            Numerator coefficients.
        a : array
            Denominator coefficients.
        signal : array
            Signal to filter.
        zi : array, optional
            Initial filter state.
        check_phase : bool, optional
            If True, use the forward-backward technique.
        ``**kwargs`` : dict, optional
            Additional keyword arguments are passed to the underlying filtering
            function.

        Returns
        -------
        filtered : array
            Filtered signal.
        zf : array
            Final filter state.

        Notes
        -----
        * If check_phase is True, zi cannot be set.

        """

        # check inputs
        if check_phase and zi is not None:
            raise ValueError(
                "Incompatible arguments: initial filter state cannot be set when \
                check_phase is True.")

        if zi is None:
            zf = None
            if check_phase:
                filtered = ss.filtfilt(b, a, signal, **kwargs)
            else:
                filtered = ss.lfilter(b, a, signal, **kwargs)
        else:
            filtered, zf = ss.lfilter(b, a, signal, zi=zi, **kwargs)

        return filtered, zf

    @staticmethod
    def _filter_resp(b, a, sampling_rate=1000., nfreqs=4096):
        """Compute the filter frequency response.

        Parameters
        ----------
        b : array
            Numerator coefficients.
        a : array
            Denominator coefficients.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        nfreqs : int, optional
            Number of frequency points to compute.

        Returns
        -------
        freqs : array
            Array of frequencies (Hz) at which the response was computed.
        resp : array
            Frequency response.

        """

        w, resp = ss.freqz(b, a, nfreqs)

        # convert frequencies
        freqs = w * sampling_rate / (2. * np.pi)

        return freqs, resp

    @staticmethod
    def _get_window(kernel, size, **kwargs):
        """Return a window with the specified parameters.

        Parameters
        ----------
        kernel : str
            Type of window to create.
        size : int
            Size of the window.
        ``**kwargs`` : dict, optional
            Additional keyword arguments are passed to the underlying
            scipy.signal.windows function.

        Returns
        -------
        window : array
            Created window.

        """

        # mimics scipy.signal.get_window
        if kernel in ['blackman', 'black', 'blk']:
            winfunc = ss.blackman
        elif kernel in ['triangle', 'triang', 'tri']:
            winfunc = ss.triang
        elif kernel in ['hamming', 'hamm', 'ham']:
            winfunc = ss.windows.hamming
        elif kernel in ['bartlett', 'bart', 'brt']:
            winfunc = ss.bartlett
        elif kernel in ['hanning', 'hann', 'han']:
            winfunc = ss.hann
        elif kernel in ['blackmanharris', 'blackharr', 'bkh']:
            winfunc = ss.blackmanharris
        elif kernel in ['parzen', 'parz', 'par']:
            winfunc = ss.parzen
        elif kernel in ['bohman', 'bman', 'bmn']:
            winfunc = ss.bohman
        elif kernel in ['nuttall', 'nutl', 'nut']:
            winfunc = ss.nuttall
        elif kernel in ['barthann', 'brthan', 'bth']:
            winfunc = ss.barthann
        elif kernel in ['flattop', 'flat', 'flt']:
            winfunc = ss.flattop
        elif kernel in ['kaiser', 'ksr']:
            winfunc = ss.kaiser
        elif kernel in ['gaussian', 'gauss', 'gss']:
            winfunc = ss.gaussian
        elif kernel in ['general gaussian', 'general_gaussian', 'general gauss',
                        'general_gauss', 'ggs']:
            winfunc = ss.general_gaussian
        elif kernel in ['boxcar', 'box', 'ones', 'rect', 'rectangular']:
            winfunc = ss.boxcar
        elif kernel in ['slepian', 'slep', 'optimal', 'dpss', 'dss']:
            winfunc = ss.slepian
        elif kernel in ['cosine', 'halfcosine']:
            winfunc = ss.cosine
        elif kernel in ['chebwin', 'cheb']:
            winfunc = ss.chebwin
        else:
            raise ValueError("Unknown window type.")

        try:
            window = winfunc(size, **kwargs)
        except TypeError as e:
            raise TypeError("Invalid window arguments: %s." % e)

        return window

    @staticmethod
    def get_filter(ftype='FIR',
                   band='lowpass',
                   order=None,
                   frequency=None,
                   sampling_rate=1000., **kwargs):
        """Compute digital (FIR or IIR) filter coefficients with the given
        parameters.

        Parameters
        ----------
        ftype : str
            Filter type:
                * Finite Impulse Response filter ('FIR');
                * Butterworth filter ('butter');
                * Chebyshev filters ('cheby1', 'cheby2');
                * Elliptic filter ('ellip');
                * Bessel filter ('bessel').
        band : str
            Band type:
                * Low-pass filter ('lowpass');
                * High-pass filter ('highpass');
                * Band-pass filter ('bandpass');
                * Band-stop filter ('bandstop').
        order : int
            Order of the filter.
        frequency : int, float, list, array
            Cutoff frequencies; format depends on type of band:
                * 'lowpass' or 'bandpass': single frequency;
                * 'bandpass' or 'bandstop': pair of frequencies.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        ``**kwargs`` : dict, optional
            Additional keyword arguments are passed to the underlying
            scipy.signal function.

        Returns
        -------
        b : array
            Numerator coefficients.
        a : array
            Denominator coefficients.

        See Also:
            scipy.signal

        """

        # check inputs
        if order is None:
            raise TypeError("Please specify the filter order.")
        if frequency is None:
            raise TypeError("Please specify the cutoff frequency.")
        if band not in ['lowpass', 'highpass', 'bandpass', 'bandstop']:
            raise ValueError(
                "Unknown filter type '%r'; choose 'lowpass', 'highpass', \
                'bandpass', or 'bandstop'."
                % band)

        # convert frequencies
        frequency = st._norm_freq(frequency, sampling_rate)

        # get coeffs
        b, a = [], []
        if ftype == 'FIR':
            # FIR filter
            if order % 2 == 0:
                order += 1
            a = np.array([1])
            if band in ['lowpass', 'bandstop']:
                b = ss.firwin(numtaps=order,
                              cutoff=frequency,
                              pass_zero=True, **kwargs)
            elif band in ['highpass', 'bandpass']:
                b = ss.firwin(numtaps=order,
                              cutoff=frequency,
                              pass_zero=False, **kwargs)
        elif ftype == 'butter':
            # Butterworth filter
            b, a = ss.butter(N=order,
                             Wn=frequency,
                             btype=band,
                             analog=False,
                             output='ba', **kwargs)
        elif ftype == 'cheby1':
            # Chebyshev type I filter
            b, a = ss.cheby1(N=order,
                             Wn=frequency,
                             btype=band,
                             analog=False,
                             output='ba', **kwargs)
        elif ftype == 'cheby2':
            # chevyshev type II filter
            b, a = ss.cheby2(N=order,
                             Wn=frequency,
                             btype=band,
                             analog=False,
                             output='ba', **kwargs)
        elif ftype == 'ellip':
            # Elliptic filter
            b, a = ss.ellip(N=order,
                            Wn=frequency,
                            btype=band,
                            analog=False,
                            output='ba', **kwargs)
        elif ftype == 'bessel':
            # Bessel filter
            b, a = ss.bessel(N=order,
                             Wn=frequency,
                             btype=band,
                             analog=False,
                             output='ba', **kwargs)

        return utils.ReturnTuple((b, a), ('b', 'a'))

    @staticmethod
    def filter_signal(signal=None,
                      ftype='FIR',
                      band='lowpass',
                      order=None,
                      frequency=None,
                      sampling_rate=1000., **kwargs):
        """Filter a signal according to the given parameters.

        Parameters
        ----------
        signal : array
            Signal to filter.
        ftype : str
            Filter type:
                * Finite Impulse Response filter ('FIR');
                * Butterworth filter ('butter');
                * Chebyshev filters ('cheby1', 'cheby2');
                * Elliptic filter ('ellip');
                * Bessel filter ('bessel').
        band : str
            Band type:
                * Low-pass filter ('lowpass');
                * High-pass filter ('highpass');
                * Band-pass filter ('bandpass');
                * Band-stop filter ('bandstop').
        order : int
            Order of the filter.
        frequency : int, float, list, array
            Cutoff frequencies; format depends on type of band:
                * 'lowpass' or 'bandpass': single frequency;
                * 'bandpass' or 'bandstop': pair of frequencies.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        ``**kwargs`` : dict, optional
            Additional keyword arguments are passed to the underlying
            scipy.signal function.

        Returns
        -------
        signal : array
            Filtered signal.
        sampling_rate : float
            Sampling frequency (Hz).
        params : dict
            Filter parameters.

        Notes
        -----
        * Uses a forward-backward filter implementation. Therefore, the combined
          filter has linear phase.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify a signal to filter.")

        # get filter
        b, a = st.get_filter(ftype=ftype,
                          order=order,
                          frequency=frequency,
                          sampling_rate=sampling_rate,
                          band=band, **kwargs)

        # filter
        filtered, _ = st._filter_signal(b, a, signal, check_phase=True)

        # output
        params = {
            'ftype': ftype,
            'order': order,
            'frequency': frequency,
            'band': band,
        }
        params.update(kwargs)

        args = (filtered, sampling_rate, params)
        names = ('signal', 'sampling_rate', 'params')

        return utils.ReturnTuple(args, names)

    class OnlineFilter(object):
        """Online filtering.

        Parameters
        ----------
        b : array
            Numerator coefficients.
        a : array
            Denominator coefficients.

        """

        def __init__(self, b=None, a=None):
            # check inputs
            if b is None:
                raise TypeError('Please specify the numerator coefficients.')

            if a is None:
                raise TypeError('Please specify the denominator coefficients.')

            # self things
            self.b = b
            self.a = a

            # reset
            self.reset()

        def reset(self):
            """Reset the filter state."""

            self.zi = ss.lfilter_zi(self.b, self.a)

        def filter(self, signal=None):
            """Filter a signal segment.

            Parameters
            ----------
            signal : array
                Signal segment to filter.

            Returns
            -------
            filtered : array
                Filtered signal segment.

            """

            # check input
            if signal is None:
                raise TypeError('Please specify the input signal.')

            filtered, zf = ss.lfilter(self.b, self.a, signal, zi=self.zi)
            self.zi = zf

            return utils.ReturnTuple((filtered, ), ('filtered', ))

    @staticmethod
    def smoother(signal=None, kernel='boxzen', size=10, mirror=True, **kwargs):
        """Smooth a signal using an N-point moving average [MAvg]_ filter.

        This implementation uses the convolution of a filter kernel with the input
        signal to compute the smoothed signal [Smit97]_.

        Availabel kernels: median, boxzen, boxcar, triang, blackman, hamming, hann,
        bartlett, flattop, parzen, bohman, blackmanharris, nuttall, barthann,
        kaiser (needs beta), gaussian (needs std), general_gaussian (needs power,
        width), slepian (needs width), chebwin (needs attenuation).

        Parameters
        ----------
        signal : array
            Signal to smooth.
        kernel : str, array, optional
            Type of kernel to use; if array, use directly as the kernel.
        size : int, optional
            Size of the kernel; ignored if kernel is an array.
        mirror : bool, optional
            If True, signal edges are extended to avoid boundary effects.
        ``**kwargs`` : dict, optional
            Additional keyword arguments are passed to the underlying
            scipy.signal.windows function.

        Returns
        -------
        signal : array
            Smoothed signal.
        params : dict
            Smoother parameters.

        Notes
        -----
        * When the kernel is 'median', mirror is ignored.

        References
        ----------
        .. [MAvg] Wikipedia, "Moving Average",
           http://en.wikipedia.org/wiki/Moving_average
        .. [Smit97] S. W. Smith, "Moving Average Filters - Implementation by
           Convolution", http://www.dspguide.com/ch15/1.htm, 1997

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify a signal to smooth.")

        length = len(signal)

        if isinstance(kernel, six.string_types):
            # check length
            if size > length:
                size = length - 1

            if size < 1:
                size = 1

            if kernel == 'boxzen':
                # hybrid method
                # 1st pass - boxcar kernel
                aux, _ = smoother(signal,
                                  kernel='boxcar',
                                  size=size,
                                  mirror=mirror)

                # 2nd pass - parzen kernel
                smoothed, _ = smoother(aux,
                                       kernel='parzen',
                                       size=size,
                                       mirror=mirror)

                params = {'kernel': kernel, 'size': size, 'mirror': mirror}

                args = (smoothed, params)
                names = ('signal', 'params')

                return utils.ReturnTuple(args, names)

            elif kernel == 'median':
                # median filter
                if size % 2 == 0:
                    raise ValueError(
                        "When the kernel is 'median', size must be odd.")

                smoothed = ss.medfilt(signal, kernel_size=size)

                params = {'kernel': kernel, 'size': size, 'mirror': mirror}

                args = (smoothed, params)
                names = ('signal', 'params')

                return utils.ReturnTuple(args, names)

            else:
                win = st._get_window(kernel, size, **kwargs)

        elif isinstance(kernel, np.ndarray):
            win = kernel
            size = len(win)

            # check length
            if size > length:
                raise ValueError("Kernel size is bigger than signal length.")

            if size < 1:
                raise ValueError("Kernel size is smaller than 1.")

        else:
            raise TypeError("Unknown kernel type.")

        # convolve
        w = win / win.sum()
        if mirror:
            aux = np.concatenate(
                (signal[0] * np.ones(size), signal, signal[-1] * np.ones(size)))
            smoothed = np.convolve(w, aux, mode='same')
            smoothed = smoothed[size:-size]
        else:
            smoothed = np.convolve(w, signal, mode='same')

        # output
        params = {'kernel': kernel, 'size': size, 'mirror': mirror}
        params.update(kwargs)

        args = (smoothed, params)
        names = ('signal', 'params')

        return utils.ReturnTuple(args, names)

    @staticmethod
    def analytic_signal(signal=None, N=None):
        """Compute analytic signal, using the Hilbert Transform.

        Parameters
        ----------
        signal : array
            Input signal.
        N : int, optional
            Number of Fourier components; default is `len(signal)`.

        Returns
        -------
        amplitude : array
            Amplitude envelope of the analytic signal.
        phase : array
            Instantaneous phase component of the analystic signal.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        # hilbert transform
        asig = ss.hilbert(signal, N=N)

        # amplitude envelope
        amp = np.absolute(asig)

        # instantaneous
        phase = np.angle(asig)

        return utils.ReturnTuple((amp, phase), ('amplitude', 'phase'))

    @staticmethod
    def phase_locking(signal1=None, signal2=None, N=None):
        """Compute the Phase-Locking Factor (PLF) between two signals.

        Parameters
        ----------
        signal1 : array
            First input signal.
        signal2 : array
            Second input signal.
        N : int, optional
            Number of Fourier components.

        Returns
        -------
        plf : float
            The PLF between the two signals.

        """

        # check inputs
        if signal1 is None:
            raise TypeError("Please specify the first input signal.")

        if signal2 is None:
            raise TypeError("Please specify the second input signal.")

        if len(signal1) != len(signal2):
            raise ValueError("The input signals must have the same length.")

        # compute analytic signal
        asig1 = ss.hilbert(signal1, N=N)
        phase1 = np.angle(asig1)

        asig2 = ss.hilbert(signal2, N=N)
        phase2 = np.angle(asig2)

        # compute PLF
        plf = np.absolute(np.mean(np.exp(1j * (phase1 - phase2))))

        return utils.ReturnTuple((plf,), ('plf',))

    @staticmethod
    def power_spectrum(signal=None,
                       sampling_rate=1000.,
                       pad=None,
                       pow2=False,
                       decibel=True):
        """Compute the power spectrum of a signal (one-sided).

        Parameters
        ----------
        signal : array
            Input signal.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        pad : int, optional
            Padding for the Fourier Transform (number of zeros added);
            defaults to no padding..
        pow2 : bool, optional
            If True, rounds the number of points `N = len(signal) + pad` to the
            nearest power of 2 greater than N.
        decibel : bool, optional
            If True, returns the power in decibels.

        Returns
        -------
        freqs : array
            Array of frequencies (Hz) at which the power was computed.
        power : array
            Power spectrum.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        npoints = len(signal)

        if pad is not None:
            if pad >= 0:
                npoints += pad
            else:
                raise ValueError("Padding must be a positive integer.")

        # power of 2
        if pow2:
            npoints = 2 ** (np.ceil(np.log2(npoints)))

        Nyq = float(sampling_rate) / 2
        hpoints = npoints // 2

        freqs = np.linspace(0, Nyq, hpoints)
        power = np.abs(np.fft.fft(signal, npoints)) / npoints

        # one-sided
        power = power[:hpoints]
        power[1:] *= 2
        power = np.power(power, 2)

        if decibel:
            power = 10. * np.log10(power)

        return utils.ReturnTuple((freqs, power), ('freqs', 'power'))

    @staticmethod
    def welch_spectrum(signal=None,
                       sampling_rate=1000.,
                       size=None,
                       overlap=None,
                       window='hanning',
                       window_kwargs=None,
                       pad=None,
                       decibel=True):
        """Compute the power spectrum of a signal using Welch's method (one-sided).

        Parameters
        ----------
        signal : array
            Input signal.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        size : int, optional
            Number of points in each Welch segment;
            defaults to the equivalent of 1 second;
            ignored when 'window' is an array.
        overlap : int, optional
            Number of points to overlap between segments; defaults to `size / 2`.
        window : str, array, optional
            Type of window to use.
        window_kwargs : dict, optional
            Additional keyword arguments to pass on window creation; ignored if
            'window' is an array.
        pad : int, optional
            Padding for the Fourier Transform (number of zeros added);
            defaults to no padding.
        decibel : bool, optional
            If True, returns the power in decibels.

        Returns
        -------
        freqs : array
            Array of frequencies (Hz) at which the power was computed.
        power : array
            Power spectrum.

        Notes
        -----
        * Detrends each Welch segment by removing the mean.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        length = len(signal)
        sampling_rate = float(sampling_rate)

        if size is None:
            size = int(sampling_rate)

        if window_kwargs is None:
            window_kwargs = {}

        if isinstance(window, six.string_types):
            win = _get_window(window, size, **window_kwargs)
        elif isinstance(window, np.ndarray):
            win = window
            size = len(win)

        if size > length:
            raise ValueError('Segment size must be smaller than signal length.')

        if overlap is None:
            overlap = size // 2
        elif overlap > size:
            raise ValueError('Overlap must be smaller than segment size.')

        nfft = size
        if pad is not None:
            if pad >= 0:
                nfft += pad
            else:
                raise ValueError("Padding must be a positive integer.")

        freqs, power = ss.welch(
            signal,
            fs=sampling_rate,
            window=win,
            nperseg=size,
            noverlap=overlap,
            nfft=nfft,
            detrend='constant',
            return_onesided=True,
            scaling='spectrum',
        )

        # compensate one-sided
        power *= 2

        if decibel:
            power = 10. * np.log10(power)

        return utils.ReturnTuple((freqs, power), ('freqs', 'power'))

    @staticmethod
    def band_power(freqs=None, power=None, frequency=None, decibel=True):
        """Compute the avearge power in a frequency band.

        Parameters
        ----------
        freqs : array
            Array of frequencies (Hz) at which the power was computed.
        power : array
            Input power spectrum.
        frequency : list, array
            Pair of frequencies defining the band.
        decibel : bool, optional
            If True, input power is in decibels.

        Returns
        -------
        avg_power : float
            The average power in the band.

        """

        # check inputs
        if freqs is None:
            raise TypeError("Please specify the 'freqs' array.")

        if power is None:
            raise TypeError("Please specify the input power spectrum.")

        if len(freqs) != len(power):
            raise ValueError(
                "The input 'freqs' and 'power' arrays must have the same length.")

        if frequency is None:
            raise TypeError("Please specify the band frequencies.")

        try:
            f1, f2 = frequency
        except ValueError:
            raise ValueError("Input 'frequency' must be a pair of frequencies.")

        # make frequencies sane
        if f1 > f2:
            f1, f2 = f2, f1

        if f1 < freqs[0]:
            f1 = freqs[0]
        if f2 > freqs[-1]:
            f2 = freqs[-1]

        # average
        sel = np.nonzero(np.logical_and(f1 <= freqs, freqs <= f2))[0]

        if decibel:
            aux = 10 ** (power / 10.)
            avg = np.mean(aux[sel])
            avg = 10. * np.log10(avg)
        else:
            avg = np.mean(power[sel])

        return utils.ReturnTuple((avg,), ('avg_power',))

    @staticmethod
    def signal_stats(signal=None):
        """Compute various metrics describing the signal.

        Parameters
        ----------
        signal : array
            Input signal.

        Returns
        -------
        mean : float
            Mean of the signal.
        median : float
            Median of the signal.
        max : float
            Maximum signal amplitude.
        var : float
            Signal variance (unbiased).
        std_dev : float
            Standard signal deviation (unbiased).
        abs_dev : float
            Absolute signal deviation.
        kurtosis : float
            Signal kurtosis (unbiased).
        skew : float
            Signal skewness (unbiased).

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        # ensure numpy
        signal = np.array(signal)

        # mean
        mean = np.mean(signal)

        # median
        median = np.median(signal)

        # maximum amplitude
        maxAmp = np.abs(signal - mean).max()

        # variance
        sigma2 = signal.var(ddof=1)

        # standard deviation
        sigma = signal.std(ddof=1)

        # absolute deviation
        ad = np.sum(np.abs(signal - median))

        # kurtosis
        kurt = stats.kurtosis(signal, bias=False)

        # skweness
        skew = stats.skew(signal, bias=False)

        # output
        args = (mean, median, maxAmp, sigma2, sigma, ad, kurt, skew)
        names = ('mean', 'median', 'max', 'var', 'std_dev', 'abs_dev', 'kurtosis',
                 'skewness')

        return utils.ReturnTuple(args, names)

    @staticmethod
    def normalize(signal=None, ddof=1):
        """Normalize a signal to zero mean and unitary standard deviation.

        Parameters
        ----------
        signal : array
            Input signal.
        ddof : int, optional
            Delta degrees of freedom for standard deviation computation;
            the divisor is `N - ddof`, where `N` is the number of elements;
            default is one.

        Returns
        -------
        signal : array
            Normalized signal.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        # ensure numpy
        signal = np.array(signal)

        normalized = signal - signal.mean()
        normalized /= normalized.std(ddof=ddof)

        return utils.ReturnTuple((normalized,), ('signal',))

    @staticmethod
    def zero_cross(signal=None, detrend=False):
        """Locate the indices where the signal crosses zero.

        Parameters
        ----------
        signal : array
            Input signal.
        detrend : bool, optional
            If True, remove signal mean before computation.

        Returns
        -------
        zeros : array
            Indices of zero crossings.

        Notes
        -----
        * When the signal crosses zero between samples, the first index
          is returned.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        if detrend:
            signal = signal - np.mean(signal)

        # zeros
        df = np.diff(np.sign(signal))
        zeros = np.nonzero(np.abs(df) > 0)[0]

        return utils.ReturnTuple((zeros,), ('zeros',))

    @staticmethod
    def find_extrema(signal=None, mode='both'):
        """Locate local extrema points in a signal.

        Based on Fermat's Theorem [Ferm]_.

        Parameters
        ----------
        signal : array
            Input signal.
        mode : str, optional
            Whether to find maxima ('max'), minima ('min'), or both ('both').

        Returns
        -------
        extrema : array
            Indices of the extrama points.
        values : array
            Signal values at the extrema points.

        References
        ----------
        .. [Ferm] Wikipedia, "Fermat's theorem (stationary points)",
           https://en.wikipedia.org/wiki/Fermat%27s_theorem_(stationary_points)

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        if mode not in ['max', 'min', 'both']:
            raise ValueError("Unknwon mode %r." % mode)

        aux = np.diff(np.sign(np.diff(signal)))

        if mode == 'both':
            aux = np.abs(aux)
            extrema = np.nonzero(aux > 0)[0] + 1
        elif mode == 'max':
            extrema = np.nonzero(aux < 0)[0] + 1
        elif mode == 'min':
            extrema = np.nonzero(aux > 0)[0] + 1

        values = signal[extrema]

        return utils.ReturnTuple((extrema, values), ('extrema', 'values'))

    @staticmethod
    def windower(signal=None,
                 size=None,
                 step=None,
                 fcn=None,
                 fcn_kwargs=None,
                 kernel='boxcar',
                 kernel_kwargs=None):
        """Apply a function to a signal in sequential windows, with optional overlap.

        Availabel window kernels: boxcar, triang, blackman, hamming, hann,
        bartlett, flattop, parzen, bohman, blackmanharris, nuttall, barthann,
        kaiser (needs beta), gaussian (needs std), general_gaussian (needs power,
        width), slepian (needs width), chebwin (needs attenuation).

        Parameters
        ----------
        signal : array
            Input signal.
        size : int
            Size of the signal window.
        step : int, optional
            Size of window shift; if None, there is no overlap.
        fcn : callable
            Function to apply to each window.
        fcn_kwargs : dict, optional
            Additional keyword arguments to pass to 'fcn'.
        kernel : str, array, optional
            Type of kernel to use; if array, use directly as the kernel.
        kernel_kwargs : dict, optional
            Additional keyword arguments to pass on window creation; ignored if
            'kernel' is an array.

        Returns
        -------
        index : array
            Indices characterizing window locations (start of the window).
        values : array
            Concatenated output of calling 'fcn' on each window.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify an input signal.")

        if fcn is None:
            raise TypeError("Please specify a function to apply to each window.")

        if fcn_kwargs is None:
            fcn_kwargs = {}

        if kernel_kwargs is None:
            kernel_kwargs = {}

        length = len(signal)

        if isinstance(kernel, six.string_types):
            # check size
            if size > length:
                raise ValueError("Window size must be smaller than signal length.")

            win = _get_window(kernel, size, **kernel_kwargs)
        elif isinstance(kernel, np.ndarray):
            win = kernel
            size = len(win)

            # check size
            if size > length:
                raise ValueError("Window size must be smaller than signal length.")

        if step is None:
            step = size

        if step <= 0:
            raise ValueError("Step size must be at least 1.")

        # number of windows
        nb = 1 + (length - size) // step

        # check signal dimensionality
        if np.ndim(signal) == 2:
            # time along 1st dim, tile window
            nch = np.shape(signal)[1]
            win = np.tile(np.reshape(win, (size, 1)), nch)

        index = []
        values = []
        for i in range(nb):
            start = i * step
            stop = start + size
            index.append(start)

            aux = signal[start:stop] * win

            # apply function
            out = fcn(aux, **fcn_kwargs)
            values.append(out)

        # transform to numpy
        index = np.array(index, dtype='int')
        values = np.array(values)

        return utils.ReturnTuple((index, values), ('index', 'values'))

    @staticmethod
    def synchronize(x=None, y=None, detrend=True):
        """Align two signals based on cross-correlation.

        Parameters
        ----------
        x : array
            First input signal.
        y : array
            Second input signal.
        detrend : bool, optional
            If True, remove signal means before computation.

        Returns
        -------
        delay : int
            Delay (number of samples) of 'x' in relation to 'y';
            if 'delay' < 0 , 'x' is ahead in relation to 'y';
            if 'delay' > 0 , 'x' is delayed in relation to 'y'.
        corr : float
            Value of maximum correlation.
        synch_x : array
            Biggest possible portion of 'x' in synchronization.
        synch_y : array
            Biggest possible portion of 'y' in synchronization.

        """

        # check inputs
        if x is None:
            raise TypeError("Please specify the first input signal.")

        if y is None:
            raise TypeError("Please specify the second input signal.")

        n1 = len(x)
        n2 = len(y)

        if detrend:
            x = x - np.mean(x)
            y = y - np.mean(y)

        # correlate
        corr = np.correlate(x, y, mode='full')
        d = np.arange(-n2 + 1, n1, dtype='int')
        ind = np.argmax(corr)

        delay = d[ind]
        maxCorr = corr[ind]

        # get synchronization overlap
        if delay < 0:
            c = min([n1, len(y[-delay:])])
            synch_x = x[:c]
            synch_y = y[-delay:-delay + c]
        elif delay > 0:
            c = min([n2, len(x[delay:])])
            synch_x = x[delay:delay + c]
            synch_y = y[:c]
        else:
            c = min([n1, n2])
            synch_x = x[:c]
            synch_y = y[:c]

        # output
        args = (delay, maxCorr, synch_x, synch_y)
        names = ('delay', 'corr', 'synch_x', 'synch_y')

        return utils.ReturnTuple(args, names)

    @staticmethod
    def pearson_correlation(x=None, y=None):
        """Compute the Pearson Correlation Coefficient bertween two signals.

        The coefficient is given by:

        .. math::

            r_{xy} = \\frac{E[(X - \\mu_X) (Y - \\mu_Y)]}{\\sigma_X \\sigma_Y}

        Parameters
        ----------
        x : array
            First input signal.
        y : array
            Second input signal.

        Returns
        -------
        rxy : float
            Pearson correlation coefficient, ranging between -1 and +1.

        Raises
        ------
        ValueError
            If the input signals do not have the same length.

        """

        # check inputs
        if x is None:
            raise TypeError("Please specify the first input signal.")

        if y is None:
            raise TypeError("Please specify the second input signal.")

        # ensure numpy
        x = np.array(x)
        y = np.array(y)

        n = len(x)

        if n != len(y):
            raise ValueError('Input signals must have the same length.')

        mx = np.mean(x)
        my = np.mean(y)

        Sxy = np.sum(x * y) - n*mx*my
        Sxx = np.sum(np.power(x, 2)) - n * mx**2
        Syy = np.sum(np.power(x, 2)) - n * my**2

        rxy = Sxy / (np.sqrt(Sxx) * np.sqrt(Syy))

        # avoid propagation of numerical errors
        if rxy > 1.0:
            rxy = 1.0
        elif rxy < -1.0:
            rxy = -1.0

        return utils.ReturnTuple((rxy, ), ('rxy', ))

    @staticmethod
    def rms_error(x=None, y=None):
        """Compute the Root-Mean-Square Error between two signals.

        The error is given by:

        .. math::

            rmse = \\sqrt{E[(X - Y)^2]}

        Parameters
        ----------
        x : array
            First input signal.
        y : array
            Second input signal.

        Returns
        -------
        rmse : float
            Root-mean-square error.

        Raises
        ------
        ValueError
            If the input signals do not have the same length.

        """

        # check inputs
        if x is None:
            raise TypeError("Please specify the first input signal.")

        if y is None:
            raise TypeError("Please specify the second input signal.")

        # ensure numpy
        x = np.array(x)
        y = np.array(y)

        n = len(x)

        if n != len(y):
            raise ValueError('Input signals must have the same length.')

        rmse = np.sqrt(np.mean(np.power(x - y, 2)))

        return utils.ReturnTuple((rmse, ), ('rmse', ))

    @staticmethod
    def get_heart_rate(beats=None, sampling_rate=1000., smooth=False, size=3):
        """Compute instantaneous heart rate from an array of beat indices.

        Parameters
        ----------
        beats : array
            Beat location indices.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        smooth : bool, optional
            If True, perform smoothing on the resulting heart rate.
        size : int, optional
            Size of smoothing window; ignored if `smooth` is False.

        Returns
        -------
        index : array
            Heart rate location indices.
        heart_rate : array
            Instantaneous heart rate (bpm).

        Notes
        -----
        * Assumes normal human heart rate to be between 40 and 200 bpm.

        """

        # check inputs
        if beats is None:
            raise TypeError("Please specify the input beat indices.")

        if len(beats) < 2:
            raise ValueError("Not enough beats to compute heart rate.")

        # compute heart rate
        ts = beats[1:]
        hr = sampling_rate * (60. / np.diff(beats))

        # physiological limits
        indx = np.nonzero(np.logical_and(hr >= 40, hr <= 200))
        ts = ts[indx]
        hr = hr[indx]

        # smooth with moving average
        if smooth and (len(hr) > 1):
            hr, _ = smoother(signal=hr, kernel='boxcar', size=size, mirror=True)

        return utils.ReturnTuple((ts, hr), ('index', 'heart_rate'))

    @staticmethod
    def _pdiff(x, p1, p2):
        """Compute the squared difference between two interpolators, given the
        x-coordinates.

        Parameters
        ----------
        x : array
            Array of x-coordinates.
        p1 : object
            First interpolator.
        p2 : object
            Second interpolator.

        Returns
        -------
        diff : array
            Squared differences.

        """

        diff = (p1(x) - p2(x)) ** 2

        return diff

    @staticmethod
    def find_intersection(x1=None,
                          y1=None,
                          x2=None,
                          y2=None,
                          alpha=1.5,
                          xtol=1e-6,
                          ytol=1e-6):
        """Find the intersection points between two lines using piecewise
        polynomial interpolation.

        Parameters
        ----------
        x1 : array
            Array of x-coordinates of the first line.
        y1 : array
            Array of y-coordinates of the first line.
        x2 : array
            Array of x-coordinates of the second line.
        y2 : array
            Array of y-coordinates of the second line.
        alpha : float, optional
            Resolution factor for the x-axis; fraction of total number of
            x-coordinates.
        xtol : float, optional
            Tolerance for the x-axis.
        ytol : float, optional
            Tolerance for the y-axis.

        Returns
        -------
        roots : array
            Array of x-coordinates of found intersection points.
        values : array
            Array of y-coordinates of found intersection points.

        Notes
        -----
        * If no intersection is found, returns the closest point.

        """

        # check inputs
        if x1 is None:
            raise TypeError("Please specify the x-coordinates of the first line.")
        if y1 is None:
            raise TypeError("Please specify the y-coordinates of the first line.")
        if x2 is None:
            raise TypeError("Please specify the x-coordinates of the second line.")
        if y2 is None:
            raise TypeError("Please specify the y-coordinates of the second line.")

        # ensure numpy
        x1 = np.array(x1)
        y1 = np.array(y1)
        x2 = np.array(x2)
        y2 = np.array(y2)

        if x1.shape != y1.shape:
            raise ValueError(
                "Input coordinates for the first line must have the same shape.")
        if x2.shape != y2.shape:
            raise ValueError(
                "Input coordinates for the second line must have the same shape.")

        # interpolate
        p1 = interpolate.BPoly.from_derivatives(x1, y1[:, np.newaxis])
        p2 = interpolate.BPoly.from_derivatives(x2, y2[:, np.newaxis])

        # combine x intervals
        x = np.r_[x1, x2]
        x_min = x.min()
        x_max = x.max()
        npoints = int(len(np.unique(x)) * alpha)
        x = np.linspace(x_min, x_max, npoints)

        # initial estimates
        pd = p1(x) - p2(x)
        zerocs, = zero_cross(pd)

        pd_abs = np.abs(pd)
        zeros = np.nonzero(pd_abs < ytol)[0]

        ind = np.unique(np.concatenate((zerocs, zeros)))
        xi = x[ind]

        # search for solutions
        roots = set()
        for v in xi:
            root, _, ier, _ = optimize.fsolve(
                _pdiff,
                v,
                args=(p1, p2),
                full_output=True,
                xtol=xtol,
            )
            if ier == 1 and x_min <= root <= x_max:
                roots.add(root[0])

        if len(roots) == 0:
            # no solution was found => give the best from the initial estimates
            aux = np.abs(pd)
            bux = aux.min() * np.ones(npoints, dtype='float')
            roots, _ = find_intersection(x, aux, x, bux,
                                         alpha=1.,
                                         xtol=xtol,
                                         ytol=ytol)

        # compute values
        roots = list(roots)
        roots.sort()
        roots = np.array(roots)
        values = np.mean(np.vstack((p1(roots), p2(roots))), axis=0)

        return utils.ReturnTuple((roots, values), ('roots', 'values'))

    @staticmethod
    def finite_difference(signal=None, weights=None):
        """Apply the Finite Difference method to compute derivatives.

        Parameters
        ----------
        signal : array
            Signal to differentiate.
        weights : list, array
            Finite difference weight coefficients.

        Returns
        -------
        index : array
            Indices from `signal` for which the derivative was computed.
        derivative : array
            Computed derivative.

        Notes
        -----
        * The method assumes central differences weights.
        * The method accounts for the delay introduced by the algorithm.

        Raises
        ------
        ValueError
            If the number of weights is not odd.

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify a signal to differentiate.")

        if weights is None:
            raise TypeError("Please specify the weight coefficients.")

        N = len(weights)
        if N % 2 == 0:
            raise ValueError("Number of weights must be odd.")

        # diff
        weights = weights[::-1]
        derivative = ss.lfilter(weights, [1], signal)

        # trim delay
        D = N - 1
        D2 = D // 2

        index = np.arange(D2, len(signal) - D2, dtype='int')
        derivative = derivative[D:]

        return utils.ReturnTuple((index, derivative), ('index', 'derivative'))

    @staticmethod
    def _init_dist_profile(m, n, signal):
        """Compute initial time series signal statistics for distance profile.

        Implements the algorithm described in [Mueen2014]_, using the notation
        from [Yeh2016_a]_.

        Parameters
        ----------
        m : int
            Sub-sequence length.
        n : int
            Target signal length.
        signal : array
            Target signal.

        Returns
        -------
        X : array
            Fourier Transform (FFT) of the signal.
        sigma : array
            Moving standard deviation in windows of length `m`.

        References
        ----------
        .. [Mueen2014] Abdullah Mueen, Hossein Hamooni, "Trilce Estrada: Time
           Series Join on Subsequence Correlation", ICDM 2014: 450-459
        .. [Yeh2016_a] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
           Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
           Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
           Joins for Time Series: A Unifying View that Includes Motifs, Discords
           and Shapelets", IEEE ICDM 2016

        """

        # compute signal stats
        csumx = np.zeros(n+1, dtype='float')
        csumx[1:] = np.cumsum(signal)
        sumx = csumx[m:] - csumx[:-m]

        csumx2 = np.zeros(n+1, dtype='float')
        csumx2[1:] = np.cumsum(np.power(signal, 2))
        sumx2 = csumx2[m:] - csumx2[:-m]

        meanx = sumx / m
        sigmax2 = (sumx2 / m) - np.power(meanx, 2)
        sigma = np.sqrt(sigmax2)

        # append zeros
        x = np.concatenate((signal, np.zeros(n, dtype='float')))

        # compute FFT
        X = np.fft.fft(x)

        return X, sigma

    @staticmethod
    def _ditance_profile(m, n, query, X, sigma):
        """Compute the distance profile of a query sequence against a signal.

        Implements the algorithm described in [Mueen2014]_, using the notation
        from [Yeh2016]_.

        Parameters
        ----------
        m : int
            Query sub-sequence length.
        n : int
            Target time series length.
        query : array
            Query sub-sequence.
        X : array
            Target time series Fourier Transform (FFT).
        sigma : array
            Moving standard deviation in windows of length `m`.

        Returns
        -------
        dist : array
            Distance profile (squared).

        Notes
        -----
        * Computes distances on z-normalized data.

        References
        ----------
        .. [Mueen2014] Abdullah Mueen, Hossein Hamooni, "Trilce Estrada: Time
           Series Join on Subsequence Correlation", ICDM 2014: 450-459
        .. [Yeh2016] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
           Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
           Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
           Joins for Time Series: A Unifying View that Includes Motifs, Discords
           and Shapelets", IEEE ICDM 2016

        """

        # normalize query
        q = (query - np.mean(query)) / np.std(query)

        # reverse query and append zeros
        y = np.concatenate((q[::-1], np.zeros(2*n - m, dtype='float')))

        # compute dot products fast
        Y = np.fft.fft(y)
        Z = X * Y
        z = np.fft.ifft(Z)
        z = z[m-1:n]

        # compute distances (z-normalized squared euclidean distance)
        dist = 2 * m * (1 - z / (m * sigma))

        return dist

    @staticmethod
    def distance_profile(query=None, signal=None, metric='euclidean'):
        """Compute the distance profile of a query sequence against a signal.

        Implements the algorithm described in [Mueen2014]_.

        Parameters
        ----------
        query : array
            Input query signal sequence.
        signal : array
            Input target time series signal.
        metric : str, optional
            The distance metric to use; one of 'euclidean' or 'pearson'; default
            is 'euclidean'.

        Returns
        -------
        dist : array
            Distance of the query sequence to every sub-sequnce in the signal.

        Notes
        -----
        * Computes distances on z-normalized data.

        References
        ----------
        .. [Mueen2014] Abdullah Mueen, Hossein Hamooni, "Trilce Estrada: Time
           Series Join on Subsequence Correlation", ICDM 2014: 450-459

        """

        # check inputs
        if query is None:
            raise TypeError("Please specify the input query sequence.")

        if signal is None:
            raise TypeError("Please specify the input time series signal.")

        if metric not in ['euclidean', 'pearson']:
            raise ValueError("Unknown distance metric.")

        # ensure numpy
        query = np.array(query)
        signal = np.array(signal)

        m = len(query)
        n = len(signal)
        if m > n/2:
            raise ValueError("Time series signal is too short relative to"
                             " query length.")

        # get initial signal stats
        X, sigma = _init_dist_profile(m, n, signal)

        # compute distance profile
        dist = _ditance_profile(m, n, query, X, sigma)

        if metric == 'pearson':
            dist = 1 - np.abs(dist) / (2 * m)
        elif metric == 'euclidean':
            dist = np.abs(np.sqrt(dist))

        return utils.ReturnTuple((dist, ), ('dist', ))

    @staticmethod
    def signal_self_join(signal=None, size=None, index=None, limit=None):
        """Compute the matrix profile for a self-similarity join of a time series.

        Implements the algorithm described in [Yeh2016_b]_.

        Parameters
        ----------
        signal : array
            Input target time series signal.
        size : int
            Size of the query sub-sequences.
        index : list, array, optional
            Starting indices for query sub-sequences; the default is to search all
            sub-sequences.
        limit : int, optional
            Upper limit for the number of query sub-sequences; the default is to
            search all sub-sequences.

        Returns
        -------
        matrix_index : array
            Matric profile index.
        matrix_profile : array
            Computed matrix profile (distances).

        Notes
        -----
        * Computes euclidean distances on z-normalized data.

        References
        ----------
        .. [Yeh2016_b] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
           Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
           Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
           Joins for Time Series: A Unifying View that Includes Motifs, Discords
           and Shapelets", IEEE ICDM 2016

        """

        # check inputs
        if signal is None:
            raise TypeError("Please specify the input time series signal.")

        if size is None:
            raise TypeError("Please specify the sub-sequence size.")

        # ensure numpy
        signal = np.array(signal)

        n = len(signal)
        if size > n/2:
            raise ValueError("Time series signal is too short relative to desired"
                             " sub-sequence length.")

        if size < 4:
            raise ValueError("Sub-sequence length must be at least 4.")

        # matrix profile length
        nb = n - size + 1

        # get search index
        if index is None:
            index = np.random.permutation(np.arange(nb, dtype='int'))
        else:
            index = np.array(index)
            if not np.all(index < nb):
                raise ValueError("Provided `index` exceeds allowable sub-sequences.")

        # limit search
        if limit is not None:
            if limit < 1:
                raise ValueError("Search limit must be at least 1.")

            index = index[:limit]

        # exclusion zone (to avoid query self-matches)
        ezone = int(round(size / 4))

        # initialization
        matrix_profile = np.inf * np.ones(nb, dtype='float')
        matrix_index = np.zeros(nb, dtype='int')

        X, sigma = _init_dist_profile(size, n, signal)

        # compute matrix profile
        for idx in index:
            # compute distance profile
            query = signal[idx:idx+size]
            dist = _ditance_profile(size, n, query, X, sigma)
            dist = np.abs(np.sqrt(dist)) # to have euclidean distance

            # apply exlusion zone
            a = max([0, idx-ezone])
            b = min([nb, idx+ezone+1])
            dist[a:b] = np.inf

            # find nearest neighbors
            pos = dist < matrix_profile
            matrix_profile[pos] = dist[pos]
            matrix_index[pos] = idx

            # account for exlusion zone
            neighbor = np.argmin(dist)
            matrix_profile[idx] = dist[neighbor]
            matrix_index[idx] = neighbor

        # output
        args = (matrix_index, matrix_profile)
        names = ('matrix_index', 'matrix_profile')

        return utils.ReturnTuple(args, names)

    @staticmethod
    def signal_cross_join(signal1=None,
                          signal2=None,
                          size=None,
                          index=None,
                          limit=None):
        """Compute the matrix profile for a similarity join of two time series.

        Computes the nearest sub-sequence in `signal2` for each sub-sequence in
        `signal1`. Implements the algorithm described in [Yeh2016_c]_.

        Parameters
        ----------
        signal1 : array
            Fisrt input time series signal.
        signal2 : array
            Second input time series signal.
        size : int
            Size of the query sub-sequences.
        index : list, array, optional
            Starting indices for query sub-sequences; the default is to search all
            sub-sequences.
        limit : int, optional
            Upper limit for the number of query sub-sequences; the default is to
            search all sub-sequences.

        Returns
        -------
        matrix_index : array
            Matric profile index.
        matrix_profile : array
            Computed matrix profile (distances).

        Notes
        -----
        * Computes euclidean distances on z-normalized data.

        References
        ----------
        .. [Yeh2016_c] Chin-Chia Michael Yeh, Yan Zhu, Liudmila Ulanova,
           Nurjahan Begum, Yifei Ding, Hoang Anh Dau, Diego Furtado Silva,
           Abdullah Mueen, Eamonn Keogh, "Matrix Profile I: All Pairs Similarity
           Joins for Time Series: A Unifying View that Includes Motifs, Discords
           and Shapelets", IEEE ICDM 2016

        """

        # check inputs
        if signal1 is None:
            raise TypeError("Please specify the first input time series signal.")

        if signal2 is None:
            raise TypeError("Please specify the second input time series signal.")

        if size is None:
            raise TypeError("Please specify the sub-sequence size.")

        # ensure numpy
        signal1 = np.array(signal1)
        signal2 = np.array(signal2)

        n1 = len(signal1)
        if size > n1/2:
            raise ValueError("First time series signal is too short relative to"
                             " desired sub-sequence length.")

        n2 = len(signal2)
        if size > n2/2:
            raise ValueError("Second time series signal is too short relative to"
                             " desired sub-sequence length.")

        if size < 4:
            raise ValueError("Sub-sequence length must be at least 4.")

        # matrix profile length
        nb1 = n1 - size + 1
        nb2 = n2 - size + 1

        # get search index
        if index is None:
            index = np.random.permutation(np.arange(nb2, dtype='int'))
        else:
            index = np.array(index)
            if not np.all(index < nb2):
                raise ValueError("Provided `index` exceeds allowable `signal2`"
                                 " sub-sequences.")

        # limit search
        if limit is not None:
            if limit < 1:
                raise ValueError("Search limit must be at least 1.")

            index = index[:limit]

        # initialization
        matrix_profile = np.inf * np.ones(nb1, dtype='float')
        matrix_index = np.zeros(nb1, dtype='int')

        X, sigma = _init_dist_profile(size, n1, signal1)

        # compute matrix profile
        for idx in index:
            # compute distance profile
            query = signal2[idx:idx+size]
            dist = _ditance_profile(size, n1, query, X, sigma)
            dist = np.abs(np.sqrt(dist)) # to have euclidean distance

            # find nearest neighbor
            pos = dist <= matrix_profile
            matrix_profile[pos] = dist[pos]
            matrix_index[pos] = idx

        # output
        args = (matrix_index, matrix_profile)
        names = ('matrix_index', 'matrix_profile')

        return utils.ReturnTuple(args, names)


    def mean_waves(data=None, size=None, step=None):
        """Extract mean samples from a data set.

        Parameters
        ----------
        data : array
            An m by n array of m data samples in an n-dimensional space.
        size : int
            Number of samples to use for each mean sample.
        step : int, optional
            Number of samples to jump, controlling overlap; default is equal to
            `size` (no overlap).

        Returns
        -------
        waves : array
            An k by n array of mean samples.

        Notes
        -----
        * Discards trailing samples if they are not enough to satify the `size`
          parameter.

        Raises
        ------
        ValueError
            If `step` is an invalid value.
        ValueError
            If there are not enough samples for the given `size`.

        """

        # check inputs
        if data is None:
            raise TypeError("Please specify an input data set.")

        if size is None:
            raise TypeError("Please specify the number of samples for the mean.")

        if step is None:
            step = size

        if step < 0:
            raise ValueError("The step must be a positive integer.")

        # number of waves
        L = len(data) - size
        nb = 1 + L // step
        if nb <= 0:
            raise ValueError("Not enough samples for the given `size`.")

        # compute
        waves = [np.mean(data[i:i+size], axis=0) for i in range(0, L+1, step)]
        waves = np.array(waves)

        return utils.ReturnTuple((waves, ), ('waves', ))


    def median_waves(data=None, size=None, step=None):
        """Extract median samples from a data set.

        Parameters
        ----------
        data : array
            An m by n array of m data samples in an n-dimensional space.
        size : int
            Number of samples to use for each median sample.
        step : int, optional
            Number of samples to jump, controlling overlap; default is equal to
            `size` (no overlap).

        Returns
        -------
        waves : array
            An k by n array of median samples.

        Notes
        -----
        * Discards trailing samples if they are not enough to satify the `size`
          parameter.

        Raises
        ------
        ValueError
            If `step` is an invalid value.
        ValueError
            If there are not enough samples for the given `size`.

        """

        # check inputs
        if data is None:
            raise TypeError("Please specify an input data set.")

        if size is None:
            raise TypeError("Please specify the number of samples for the median.")

        if step is None:
            step = size

        if step < 0:
            raise ValueError("The step must be a positive integer.")

        # number of waves
        L = len(data) - size
        nb = 1 + L // step
        if nb <= 0:
            raise ValueError("Not enough samples for the given `size`.")

        # compute
        waves = [np.median(data[i:i+size], axis=0) for i in range(0, L+1, step)]
        waves = np.array(waves)

        return utils.ReturnTuple((waves, ), ('waves', ))

st = Tools

In [None]:
# -*- coding: utf-8 -*-
"""
biosppy.plotting
----------------

This module provides utilities to plot data.

:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.
"""
# Globals
MAJOR_LW = 2.5
MINOR_LW = 1.5
MAX_ROWS = 10


class Plotting:
    @staticmethod
    def _plot_filter(b, a, sampling_rate=1000., nfreqs=4096, ax=None):
        """Compute and plot the frequency response of a digital filter.

        Parameters
        ----------
        b : array
            Numerator coefficients.
        a : array
            Denominator coefficients.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        nfreqs : int, optional
            Number of frequency points to compute.
        ax : axis, optional
            Plot Axis to use.

        Returns
        -------
        fig : Figure
            Figure object.

        """

        # compute frequency response
        freqs, resp = st._filter_resp(b, a,
                                      sampling_rate=sampling_rate,
                                      nfreqs=nfreqs)

        # plot
        if ax is None:
            fig = plt.figure()
            ax = fig.add_subplot(111)
        else:
            fig = ax.figure

        # amplitude
        pwr = 20. * np.log10(np.abs(resp))
        ax.semilogx(freqs, pwr, 'b', linewidth=MAJOR_LW)
        ax.set_ylabel('Amplitude (dB)', color='b')
        ax.set_xlabel('Frequency (Hz)')

        # phase
        angles = np.unwrap(np.angle(resp))
        ax2 = ax.twinx()
        ax2.semilogx(freqs, angles, 'g', linewidth=MAJOR_LW)
        ax2.set_ylabel('Angle (radians)', color='g')

        ax.grid()

        return fig

    @staticmethod
    def plot_filter(ftype='FIR',
                    band='lowpass',
                    order=None,
                    frequency=None,
                    sampling_rate=1000.,
                    path=None,
                    show=True, **kwargs):
        """Plot the frequency response of the filter specified with the given
        parameters.

        Parameters
        ----------
        ftype : str
            Filter type:
                * Finite Impulse Response filter ('FIR');
                * Butterworth filter ('butter');
                * Chebyshev filters ('cheby1', 'cheby2');
                * Elliptic filter ('ellip');
                * Bessel filter ('bessel').
        band : str
            Band type:
                * Low-pass filter ('lowpass');
                * High-pass filter ('highpass');
                * Band-pass filter ('bandpass');
                * Band-stop filter ('bandstop').
        order : int
            Order of the filter.
        frequency : int, float, list, array
            Cutoff frequencies; format depends on type of band:
                * 'lowpass' or 'bandpass': single frequency;
                * 'bandpass' or 'bandstop': pair of frequencies.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.
        ``**kwargs`` : dict, optional
            Additional keyword arguments are passed to the underlying
            scipy.signal function.

        """

        # get filter
        b, a = st.get_filter(ftype=ftype,
                             band=band,
                             order=order,
                             frequency=frequency,
                             sampling_rate=sampling_rate, **kwargs)

        # plot
        fig = _plot_filter(b, a, sampling_rate)

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_spectrum(signal=None, sampling_rate=1000., path=None, show=True):
        """Plot the power spectrum of a signal (one-sided).

        Parameters
        ----------
        signal : array
            Input signal.
        sampling_rate : int, float, optional
            Sampling frequency (Hz).
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        freqs, power = st.power_spectrum(signal, sampling_rate,
                                         pad=0,
                                         pow2=False,
                                         decibel=True)

        fig = plt.figure()
        ax = fig.add_subplot(111)

        ax.plot(freqs, power, linewidth=MAJOR_LW)
        ax.set_xlabel('Frequency (Hz)')
        ax.set_ylabel('Power (dB)')
        ax.grid()

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_bvp(ts=None,
                 raw=None,
                 filtered=None,
                 onsets=None,
                 heart_rate_ts=None,
                 heart_rate=None,
                 path=None,
                 show=False):
        """Create a summary plot from the output of signals.bvp.bvp.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        raw : array
            Raw BVP signal.
        filtered : array
            Filtered BVP signal.
        onsets : array
            Indices of BVP pulse onsets.
        heart_rate_ts : array
            Heart rate time axis reference (seconds).
        heart_rate : array
            Instantaneous heart rate (bpm).
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('BVP Summary')

        # raw signal
        ax1 = fig.add_subplot(311)

        ax1.plot(ts, raw, linewidth=MAJOR_LW, label='Raw')

        ax1.set_ylabel('Amplitude')
        ax1.legend()
        ax1.grid()

        # filtered signal with onsets
        ax2 = fig.add_subplot(312, sharex=ax1)

        ymin = np.min(filtered)
        ymax = np.max(filtered)
        alpha = 0.1 * (ymax - ymin)
        ymax += alpha
        ymin -= alpha

        ax2.plot(ts, filtered, linewidth=MAJOR_LW, label='Filtered')
        ax2.vlines(ts[onsets], ymin, ymax,
                   color='m',
                   linewidth=MINOR_LW,
                   label='Onsets')

        ax2.set_ylabel('Amplitude')
        ax2.legend()
        ax2.grid()

        # heart rate
        ax3 = fig.add_subplot(313, sharex=ax1)

        ax3.plot(heart_rate_ts, heart_rate, linewidth=MAJOR_LW, label='Heart Rate')

        ax3.set_xlabel('Time (s)')
        ax3.set_ylabel('Heart Rate (bpm)')
        ax3.legend()
        ax3.grid()

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_eda(ts=None,
                 raw=None,
                 filtered=None,
                 onsets=None,
                 peaks=None,
                 amplitudes=None,
                 path=None,
                 show=False):
        """Create a summary plot from the output of signals.eda.eda.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        raw : array
            Raw EDA signal.
        filtered : array
            Filtered EDA signal.
        onsets : array
            Indices of SCR pulse onsets.
        peaks : array
            Indices of the SCR peaks.
        amplitudes : array
            SCR pulse amplitudes.
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('EDA Summary')

        # raw signal
        ax1 = fig.add_subplot(311)

        ax1.plot(ts, raw, linewidth=MAJOR_LW, label='raw')

        ax1.set_ylabel('Amplitude')
        ax1.legend()
        ax1.grid()

        # filtered signal with onsets, peaks
        ax2 = fig.add_subplot(312, sharex=ax1)

        ymin = np.min(filtered)
        ymax = np.max(filtered)
        alpha = 0.1 * (ymax - ymin)
        ymax += alpha
        ymin -= alpha

        ax2.plot(ts, filtered, linewidth=MAJOR_LW, label='Filtered')
        ax2.vlines(ts[onsets], ymin, ymax,
                   color='m',
                   linewidth=MINOR_LW,
                   label='Onsets')
        ax2.vlines(ts[peaks], ymin, ymax,
                   color='g',
                   linewidth=MINOR_LW,
                   label='Peaks')

        ax2.set_ylabel('Amplitude')
        ax2.legend()
        ax2.grid()

        # amplitudes
        ax3 = fig.add_subplot(313, sharex=ax1)

        ax3.plot(ts[onsets], amplitudes, linewidth=MAJOR_LW, label='Amplitudes')

        ax3.set_xlabel('Time (s)')
        ax3.set_ylabel('Amplitude')
        ax3.legend()
        ax3.grid()

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_emg(ts=None,
                 sampling_rate=None,
                 raw=None,
                 filtered=None,
                 onsets=None,
                 processed=None,
                 path=None,
                 show=False):
        """Create a summary plot from the output of signals.emg.emg.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        sampling_rate : int, float
            Sampling frequency (Hz).
        raw : array
            Raw EMG signal.
        filtered : array
            Filtered EMG signal.
        onsets : array
            Indices of EMG pulse onsets.
        processed : array, optional
            Processed EMG signal according to the chosen onset detector.
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('EMG Summary')

        if processed is not None:
            ax1 = fig.add_subplot(311)
            ax2 = fig.add_subplot(312, sharex=ax1)
            ax3 = fig.add_subplot(313)

            # processed signal
            L = len(processed)
            T = (L - 1) / sampling_rate
            ts_processed = np.linspace(0, T, L, endpoint=False)
            ax3.plot(ts_processed, processed,
                     linewidth=MAJOR_LW,
                     label='Processed')
            ax3.set_xlabel('Time (s)')
            ax3.set_ylabel('Amplitude')
            ax3.legend()
            ax3.grid()
        else:
            ax1 = fig.add_subplot(211)
            ax2 = fig.add_subplot(212, sharex=ax1)

        # raw signal
        ax1.plot(ts, raw, linewidth=MAJOR_LW, label='Raw')

        ax1.set_ylabel('Amplitude')
        ax1.legend()
        ax1.grid()

        # filtered signal with onsets
        ymin = np.min(filtered)
        ymax = np.max(filtered)
        alpha = 0.1 * (ymax - ymin)
        ymax += alpha
        ymin -= alpha

        ax2.plot(ts, filtered, linewidth=MAJOR_LW, label='Filtered')
        ax2.vlines(ts[onsets], ymin, ymax,
                   color='m',
                   linewidth=MINOR_LW,
                   label='Onsets')

        ax2.set_xlabel('Time (s)')
        ax2.set_ylabel('Amplitude')
        ax2.legend()
        ax2.grid()

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_resp(ts=None,
                  raw=None,
                  filtered=None,
                  zeros=None,
                  resp_rate_ts=None,
                  resp_rate=None,
                  path=None,
                  show=False):
        """Create a summary plot from the output of signals.bvp.bvp.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        raw : array
            Raw Resp signal.
        filtered : array
            Filtered Resp signal.
        zeros : array
            Indices of Respiration zero crossings.
        resp_rate_ts : array
            Respiration rate time axis reference (seconds).
        resp_rate : array
            Instantaneous respiration rate (Hz).
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('Respiration Summary')

        # raw signal
        ax1 = fig.add_subplot(311)

        ax1.plot(ts, raw, linewidth=MAJOR_LW, label='Raw')

        ax1.set_ylabel('Amplitude')
        ax1.legend()
        ax1.grid()

        # filtered signal with zeros
        ax2 = fig.add_subplot(312, sharex=ax1)

        ymin = np.min(filtered)
        ymax = np.max(filtered)
        alpha = 0.1 * (ymax - ymin)
        ymax += alpha
        ymin -= alpha

        ax2.plot(ts, filtered, linewidth=MAJOR_LW, label='Filtered')
        ax2.vlines(ts[zeros], ymin, ymax,
                   color='m',
                   linewidth=MINOR_LW,
                   label='Zero crossings')

        ax2.set_ylabel('Amplitude')
        ax2.legend()
        ax2.grid()

        # heart rate
        ax3 = fig.add_subplot(313, sharex=ax1)

        ax3.plot(resp_rate_ts, resp_rate,
                 linewidth=MAJOR_LW,
                 label='Respiration Rate')

        ax3.set_xlabel('Time (s)')
        ax3.set_ylabel('Respiration Rate (Hz)')
        ax3.legend()
        ax3.grid()

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_eeg(ts=None,
                 raw=None,
                 filtered=None,
                 labels=None,
                 features_ts=None,
                 theta=None,
                 alpha_low=None,
                 alpha_high=None,
                 beta=None,
                 gamma=None,
                 plf_pairs=None,
                 plf=None,
                 path=None,
                 show=False):
        """Create a summary plot from the output of signals.eeg.eeg.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        raw : array
            Raw EEG signal.
        filtered : array
            Filtered EEG signal.
        labels : list
            Channel labels.
        features_ts : array
            Features time axis reference (seconds).
        theta : array
            Average power in the 4 to 8 Hz frequency band; each column is one
            EEG channel.
        alpha_low : array
            Average power in the 8 to 10 Hz frequency band; each column is one
            EEG channel.
        alpha_high : array
            Average power in the 10 to 13 Hz frequency band; each column is one
            EEG channel.
        beta : array
            Average power in the 13 to 25 Hz frequency band; each column is one
            EEG channel.
        gamma : array
            Average power in the 25 to 40 Hz frequency band; each column is one
            EEG channel.
        plf_pairs : list
            PLF pair indices.
        plf : array
            PLF matrix; each column is a channel pair.
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        nrows = MAX_ROWS
        alpha = 2.

        figs = []

        # raw
        fig = _plot_multichannel(ts=ts,
                                 signal=raw,
                                 labels=labels,
                                 nrows=nrows,
                                 alpha=alpha,
                                 title='EEG Summary - Raw',
                                 xlabel='Time (s)',
                                 ylabel='Amplitude')
        figs.append(('_Raw', fig))

        # filtered
        fig = _plot_multichannel(ts=ts,
                                 signal=filtered,
                                 labels=labels,
                                 nrows=nrows,
                                 alpha=alpha,
                                 title='EEG Summary - Filtered',
                                 xlabel='Time (s)',
                                 ylabel='Amplitude')
        figs.append(('_Filtered', fig))

        # band-power
        names = ('Theta Band', 'Lower Alpha Band', 'Higher Alpha Band',
                 'Beta Band', 'Gamma Band')
        args = (theta, alpha_low, alpha_high, beta, gamma)
        for n, a in zip(names, args):
            fig = _plot_multichannel(ts=features_ts,
                                     signal=a,
                                     labels=labels,
                                     nrows=nrows,
                                     alpha=alpha,
                                     title='EEG Summary - %s' % n,
                                     xlabel='Time (s)',
                                     ylabel='Power')
            figs.append(('_' + n.replace(' ', '_'), fig))

        # PLF
        plf_labels = ['%s vs %s' % (labels[p[0]], labels[p[1]]) for p in plf_pairs]
        fig = _plot_multichannel(ts=features_ts,
                                 signal=plf,
                                 labels=plf_labels,
                                 nrows=nrows,
                                 alpha=alpha,
                                 title='EEG Summary - Phase-Locking Factor',
                                 xlabel='Time (s)',
                                 ylabel='PLF')
        figs.append(('_PLF', fig))

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                ext = '.png'

            for n, fig in figs:
                path = root + n + ext
                fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            for _, fig in figs:
                plt.close(fig)

    @staticmethod
    def _yscaling(signal=None, alpha=1.5):
        """Get y axis limits for a signal with scaling.

        Parameters
        ----------
        signal : array
            Input signal.
        alpha : float, optional
            Scaling factor.

        Returns
        -------
        ymin : float
            Minimum y value.
        ymax : float
            Maximum y value.

        """

        mi = np.min(signal)
        m = np.mean(signal)
        mx = np.max(signal)

        if mi == mx:
            ymin = m - 1
            ymax = m + 1
        else:
            ymin = m - alpha * (m - mi)
            ymax = m + alpha * (mx - m)

        return ymin, ymax

    @staticmethod
    def _plot_multichannel(ts=None,
                           signal=None,
                           labels=None,
                           nrows=10,
                           alpha=2.,
                           title=None,
                           xlabel=None,
                           ylabel=None):
        """Plot a multi-channel signal.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        signal : array
            Multi-channel signal; each column is one channel.
        labels : list, optional
            Channel labels.
        nrows : int, optional
            Maximum number of rows to use.
        alpha : float, optional
            Scaling factor for y axis.
        title : str, optional
            Plot title.
        xlabel : str, optional
            Label for x axis.
        ylabel : str, optional
            Label for y axis.

        Returns
        -------
        fig : Figure
            Figure object.

        """

        # ensure numpy
        signal = np.array(signal)
        nch = signal.shape[1]

        # check labels
        if labels is None:
            labels = ['Ch. %d' % i for i in range(nch)]

        if nch < nrows:
            nrows = nch

        ncols = int(np.ceil(nch / float(nrows)))

        fig = plt.figure()

        # title
        if title is not None:
            fig.suptitle(title)

        gs = gridspec.GridSpec(nrows, ncols, hspace=0, wspace=0.2)

        # reference axes
        ax0 = fig.add_subplot(gs[0, 0])
        ax0.plot(ts, signal[:, 0], linewidth=MAJOR_LW, label=labels[0])
        ymin, ymax = _yscaling(signal[:, 0], alpha=alpha)
        ax0.set_ylim(ymin, ymax)
        ax0.legend()
        ax0.grid()
        axs = {(0, 0): ax0}

        for i in range(1, nch - 1):
            a = i % nrows
            b = int(np.floor(i / float(nrows)))
            ax = fig.add_subplot(gs[a, b], sharex=ax0)
            axs[(a, b)] = ax

            ax.plot(ts, signal[:, i], linewidth=MAJOR_LW, label=labels[i])
            ymin, ymax = _yscaling(signal[:, i], alpha=alpha)
            ax.set_ylim(ymin, ymax)
            ax.legend()
            ax.grid()

        # last plot
        i = nch - 1
        a = i % nrows
        b = int(np.floor(i / float(nrows)))
        ax = fig.add_subplot(gs[a, b], sharex=ax0)
        axs[(a, b)] = ax

        ax.plot(ts, signal[:, -1], linewidth=MAJOR_LW, label=labels[-1])
        ymin, ymax = _yscaling(signal[:, -1], alpha=alpha)
        ax.set_ylim(ymin, ymax)
        ax.legend()
        ax.grid()

        if xlabel is not None:
            ax.set_xlabel(xlabel)

            for b in range(0, ncols - 1):
                a = nrows - 1
                ax = axs[(a, b)]
                ax.set_xlabel(xlabel)

        if ylabel is not None:
            # middle left
            a = nrows // 2
            ax = axs[(a, 0)]
            ax.set_ylabel(ylabel)

        # make layout tight
        gs.tight_layout(fig)

        return fig

    @staticmethod
    def plot_ecg(ts=None,
                 raw=None,
                 filtered=None,
                 rpeaks=None,
                 templates_ts=None,
                 templates=None,
                 heart_rate_ts=None,
                 heart_rate=None,
                 path=None,
                 show=False):
        """Create a summary plot from the output of signals.ecg.ecg.

        Parameters
        ----------
        ts : array
            Signal time axis reference (seconds).
        raw : array
            Raw ECG signal.
        filtered : array
            Filtered ECG signal.
        rpeaks : array
            R-peak location indices.
        templates_ts : array
            Templates time axis reference (seconds).
        templates : array
            Extracted heartbeat templates.
        heart_rate_ts : array
            Heart rate time axis reference (seconds).
        heart_rate : array
            Instantaneous heart rate (bpm).
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('ECG Summary')
        gs = gridspec.GridSpec(6, 2)

        # raw signal
        ax1 = fig.add_subplot(gs[:2, 0])

        ax1.plot(ts, raw, linewidth=MAJOR_LW, label='Raw')

        ax1.set_ylabel('Amplitude')
        ax1.legend()
        ax1.grid()

        # filtered signal with rpeaks
        ax2 = fig.add_subplot(gs[2:4, 0], sharex=ax1)

        ymin = np.min(filtered)
        ymax = np.max(filtered)
        alpha = 0.1 * (ymax - ymin)
        ymax += alpha
        ymin -= alpha

        ax2.plot(ts, filtered, linewidth=MAJOR_LW, label='Filtered')
        ax2.vlines(ts[rpeaks], ymin, ymax,
                   color='m',
                   linewidth=MINOR_LW,
                   label='R-peaks')

        ax2.set_ylabel('Amplitude')
        ax2.legend()
        ax2.grid()

        # heart rate
        ax3 = fig.add_subplot(gs[4:, 0], sharex=ax1)

        ax3.plot(heart_rate_ts, heart_rate, linewidth=MAJOR_LW, label='Heart Rate')

        ax3.set_xlabel('Time (s)')
        ax3.set_ylabel('Heart Rate (bpm)')
        ax3.legend()
        ax3.grid()

        # templates
        ax4 = fig.add_subplot(gs[1:5, 1])

        ax4.plot(templates_ts, templates.T, 'm', linewidth=MINOR_LW, alpha=0.7)

        ax4.set_xlabel('Time (s)')
        ax4.set_ylabel('Amplitude')
        ax4.set_title('Templates')
        ax4.grid()

        # make layout tight
        gs.tight_layout(fig)

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def _plot_rates(thresholds, rates, variables,
                    lw=1,
                    colors=None,
                    alpha=1,
                    eer_idx=None,
                    labels=False,
                    ax=None):
        """Plot biometric rates.

        Parameters
        ----------
        thresholds : array
            Classifier thresholds.
        rates : dict
            Dictionary of rates.
        variables : list
            Keys from 'rates' to plot.
        lw : int, float, optional
            Plot linewidth.
        colors : list, optional
            Plot line color for each variable.
        alpha : float, optional
            Plot line alpha value.
        eer_idx : int, optional
            Classifier reference index for the Equal Error Rate.
        labels : bool, optional
            If True, will show plot labels.
        ax : axis, optional
            Plot Axis to use.

        Returns
        -------
        fig : Figure
            Figure object.

        """

        if ax is None:
            fig = plt.figure()
            ax = fig.add_subplot(111)
        else:
            fig = ax.figure

        if colors is None:
            x = np.linspace(0., 1., len(variables))
            colors = plt.get_cmap('rainbow')(x)

        if labels:
            for i, v in enumerate(variables):
                ax.plot(thresholds, rates[v], colors[i],
                        lw=lw,
                        alpha=alpha,
                        label=v)
        else:
            for i, v in enumerate(variables):
                ax.plot(thresholds, rates[v], colors[i], lw=lw, alpha=alpha)

        if eer_idx is not None:
            x, y = rates['EER'][eer_idx]
            ax.vlines(x, 0, 1, 'r', lw=lw)
            ax.set_title('EER = %0.2f %%' % (100. * y))

        return fig

    @staticmethod
    def plot_biometrics(assessment=None, eer_idx=None, path=None, show=False):
        """Create a summary plot of a biometrics test run.

        Parameters
        ----------
        assessment : dict
            Classification assessment results.
        eer_idx : int, optional
            Classifier reference index for the Equal Error Rate.
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('Biometrics Summary')

        c_sub = ['#008bff', '#8dd000']
        c_global = ['#0037ff', 'g']

        ths = assessment['thresholds']

        auth_ax = fig.add_subplot(121)
        id_ax = fig.add_subplot(122)

        # subject results
        for sub in six.iterkeys(assessment['subject']):
            auth_rates = assessment['subject'][sub]['authentication']['rates']
            _ = _plot_rates(ths, auth_rates, ['FAR', 'FRR'],
                            lw=MINOR_LW,
                            colors=c_sub,
                            alpha=0.4,
                            eer_idx=None,
                            labels=False,
                            ax=auth_ax)

            id_rates = assessment['subject'][sub]['identification']['rates']
            _ = _plot_rates(ths, id_rates, ['MR', 'RR'],
                            lw=MINOR_LW,
                            colors=c_sub,
                            alpha=0.4,
                            eer_idx=None,
                            labels=False,
                            ax=id_ax)

        # global results
        auth_rates = assessment['global']['authentication']['rates']
        _ = _plot_rates(ths, auth_rates, ['FAR', 'FRR'],
                        lw=MAJOR_LW,
                        colors=c_global,
                        alpha=1,
                        eer_idx=eer_idx,
                        labels=True,
                        ax=auth_ax)

        id_rates = assessment['global']['identification']['rates']
        _ = _plot_rates(ths, id_rates, ['MR', 'RR'],
                        lw=MAJOR_LW,
                        colors=c_global,
                        alpha=1,
                        eer_idx=eer_idx,
                        labels=True,
                        ax=id_ax)

        # set labels and grids
        auth_ax.set_xlabel('Threshold')
        auth_ax.set_ylabel('Authentication')
        auth_ax.grid()
        auth_ax.legend()

        id_ax.set_xlabel('Threshold')
        id_ax.set_ylabel('Identification')
        id_ax.grid()
        id_ax.legend()

        # make layout tight
        fig.tight_layout()

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

    @staticmethod
    def plot_clustering(data=None, clusters=None, path=None, show=False):
        """Create a summary plot of a data clustering.

        Parameters
        ----------
        data : array
            An m by n array of m data samples in an n-dimensional space.
        clusters : dict
            Dictionary with the sample indices (rows from `data`) for each cluster.
        path : str, optional
            If provided, the plot will be saved to the specified file.
        show : bool, optional
            If True, show the plot immediately.

        """

        fig = plt.figure()
        fig.suptitle('Clustering Summary')

        ymin, ymax = _yscaling(data, alpha=1.2)

        # determine number of clusters
        keys = list(clusters)
        nc = len(keys)

        if nc <= 4:
            nrows = 2
            ncols = 4
        else:
            area = nc + 4

            # try to fit to a square
            nrows = int(np.ceil(np.sqrt(area)))

            if nrows > MAX_ROWS:
                # prefer to increase number of columns
                nrows = MAX_ROWS

            ncols = int(np.ceil(area / float(nrows)))

        # plot grid
        gs = gridspec.GridSpec(nrows, ncols, hspace=0.2, wspace=0.2)

        # global axes
        ax_global = fig.add_subplot(gs[:2, :2])

        # cluster axes
        c_grid = np.ones((nrows, ncols), dtype='bool')
        c_grid[:2, :2] = False
        c_rows, c_cols = np.nonzero(c_grid)

        # generate color map
        x = np.linspace(0., 1., nc)
        cmap = plt.get_cmap('rainbow')

        for i, k in enumerate(keys):
            aux = data[clusters[k]]
            color = cmap(x[i])
            label = 'Cluster %s' % k
            ax = fig.add_subplot(gs[c_rows[i], c_cols[i]], sharex=ax_global)
            ax.set_ylim([ymin, ymax])
            ax.set_title(label)
            ax.grid()

            if len(aux) > 0:
                ax_global.plot(aux.T, color=color, lw=MINOR_LW, alpha=0.7)
                ax.plot(aux.T, color=color, lw=MAJOR_LW)

        ax_global.set_title('All Clusters')
        ax_global.set_ylim([ymin, ymax])
        ax_global.grid()

        # make layout tight
        gs.tight_layout(fig)

        # save to file
        if path is not None:
            path = utils.normpath(path)
            root, ext = os.path.splitext(path)
            ext = ext.lower()
            if ext not in ['png', 'jpg']:
                path = root + '.png'

            fig.savefig(path, dpi=200, bbox_inches='tight')

        # show
        if show:
            plt.show()
        else:
            # close
            plt.close(fig)

plotting = Plotting

In [None]:
# -*- coding: utf-8 -*-
"""
biosppy.signals.ecg
-------------------

This module provides methods to process Electrocardiographic (ECG) signals.
Implemented code assumes a single-channel Lead I like ECG signal.

:copyright: (c) 2015-2018 by Instituto de Telecomunicacoes
:license: BSD 3-clause, see LICENSE for more details.

"""

def ecg(signal=None, sampling_rate=1000., show=True):
    """Process a raw ECG signal and extract relevant signal features using
    default parameters.

    Parameters
    ----------
    signal : array
        Raw ECG signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    show : bool, optional
        If True, show a summary plot.

    Returns
    -------
    ts : array
        Signal time axis reference (seconds).
    filtered : array
        Filtered ECG signal.
    rpeaks : array
        R-peak location indices.
    templates_ts : array
        Templates time axis reference (seconds).
    templates : array
        Extracted heartbeat templates.
    heart_rate_ts : array
        Heart rate time axis reference (seconds).
    heart_rate : array
        Instantaneous heart rate (bpm).

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    # ensure numpy
    signal = np.array(signal)

    sampling_rate = float(sampling_rate)

    # filter signal
    order = int(0.3 * sampling_rate)
    filtered, _, _ = st.filter_signal(signal=signal,
                                      ftype='FIR',
                                      band='bandpass',
                                      order=order,
                                      frequency=[3, 45],
                                      sampling_rate=sampling_rate)

    # segment
    rpeaks, = hamilton_segmenter(signal=filtered, sampling_rate=sampling_rate)

    # correct R-peak locations
    rpeaks, = correct_rpeaks(signal=filtered,
                             rpeaks=rpeaks,
                             sampling_rate=sampling_rate,
                             tol=0.05)

    # extract templates
    templates, rpeaks = extract_heartbeats(signal=filtered,
                                           rpeaks=rpeaks,
                                           sampling_rate=sampling_rate,
                                           before=0.2,
                                           after=0.4)

    # compute heart rate
    hr_idx, hr = st.get_heart_rate(beats=rpeaks,
                                   sampling_rate=sampling_rate,
                                   smooth=True,
                                   size=3)

    # get time vectors
    length = len(signal)
    T = (length - 1) / sampling_rate
    ts = np.linspace(0, T, length, endpoint=False)
    ts_hr = ts[hr_idx]
    ts_tmpl = np.linspace(-0.2, 0.4, templates.shape[1], endpoint=False)

    # plot
    if show:
        plotting.plot_ecg(ts=ts,
                          raw=signal,
                          filtered=filtered,
                          rpeaks=rpeaks,
                          templates_ts=ts_tmpl,
                          templates=templates,
                          heart_rate_ts=ts_hr,
                          heart_rate=hr,
                          path=None,
                          show=True)

    # output
    args = (ts, filtered, rpeaks, ts_tmpl, templates, ts_hr, hr)
    names = ('ts', 'filtered', 'rpeaks', 'templates_ts', 'templates',
             'heart_rate_ts', 'heart_rate')

    return utils.ReturnTuple(args, names)


def _extract_heartbeats(signal=None, rpeaks=None, before=200, after=400):
    """Extract heartbeat templates from an ECG signal, given a list of
    R-peak locations.

    Parameters
    ----------
    signal : array
        Input ECG signal.
    rpeaks : array
        R-peak location indices.
    before : int, optional
        Number of samples to include before the R peak.
    after : int, optional
        Number of samples to include after the R peak.

    Returns
    -------
    templates : array
        Extracted heartbeat templates.
    rpeaks : array
        Corresponding R-peak location indices of the extracted heartbeat
        templates.

    """

    R = np.sort(rpeaks)
    length = len(signal)
    templates = []
    newR = []

    for r in R:
        a = r - before
        if a < 0:
            continue
        b = r + after
        if b > length:
            break
        templates.append(signal[a:b])
        newR.append(r)

    templates = np.array(templates)
    newR = np.array(newR, dtype='int')

    return templates, newR


def extract_heartbeats(signal=None, rpeaks=None, sampling_rate=1000.,
                       before=0.2, after=0.4):
    """Extract heartbeat templates from an ECG signal, given a list of
    R-peak locations.

    Parameters
    ----------
    signal : array
        Input ECG signal.
    rpeaks : array
        R-peak location indices.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    before : float, optional
        Window size to include before the R peak (seconds).
    after : int, optional
        Window size to include after the R peak (seconds).

    Returns
    -------
    templates : array
        Extracted heartbeat templates.
    rpeaks : array
        Corresponding R-peak location indices of the extracted heartbeat
        templates.

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    if rpeaks is None:
        raise TypeError("Please specify the input R-peak locations.")

    if before < 0:
        raise ValueError("Please specify a non-negative 'before' value.")
    if after < 0:
        raise ValueError("Please specify a non-negative 'after' value.")

    # convert delimiters to samples
    before = int(before * sampling_rate)
    after = int(after * sampling_rate)

    # get heartbeats
    templates, newR = _extract_heartbeats(signal=signal,
                                          rpeaks=rpeaks,
                                          before=before,
                                          after=after)

    return utils.ReturnTuple((templates, newR), ('templates', 'rpeaks'))


def compare_segmentation(reference=None, test=None, sampling_rate=1000.,
                         offset=0, minRR=None, tol=0.05):
    """Compare the segmentation performance of a list of R-peak positions
    against a reference list.

    Parameters
    ----------
    reference : array
        Reference R-peak location indices.
    test : array
        Test R-peak location indices.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    offset : int, optional
        Constant a priori offset (number of samples) between reference and
        test R-peak locations.
    minRR : float, optional
        Minimum admissible RR interval (seconds).
    tol : float, optional
        Tolerance between corresponding reference and test R-peak
        locations (seconds).

    Returns
    -------
    TP : int
        Number of true positive R-peaks.
    FP : int
        Number of false positive R-peaks.
    performance : float
        Test performance; TP / len(reference).
    acc : float
        Accuracy rate; TP / (TP + FP).
    err : float
        Error rate; FP / (TP + FP).
    match : list
        Indices of the elements of 'test' that match to an R-peak
        from 'reference'.
    deviation : array
        Absolute errors of the matched R-peaks (seconds).
    mean_deviation : float
        Mean error (seconds).
    std_deviation : float
        Standard deviation of error (seconds).
    mean_ref_ibi : float
        Mean of the reference interbeat intervals (seconds).
    std_ref_ibi : float
        Standard deviation of the reference interbeat intervals (seconds).
    mean_test_ibi : float
        Mean of the test interbeat intervals (seconds).
    std_test_ibi : float
        Standard deviation of the test interbeat intervals (seconds).

    """

    # check inputs
    if reference is None:
        raise TypeError("Please specify an input reference list of R-peak \
                        locations.")

    if test is None:
        raise TypeError("Please specify an input test list of R-peak \
                        locations.")

    if minRR is None:
        minRR = np.inf

    sampling_rate = float(sampling_rate)

    # ensure numpy
    reference = np.array(reference)
    test = np.array(test)

    # convert to samples
    minRR = minRR * sampling_rate
    tol = tol * sampling_rate

    TP = 0
    FP = 0

    matchIdx = []
    dev = []

    for i, r in enumerate(test):
        # deviation to closest R in reference
        ref = reference[np.argmin(np.abs(reference - (r + offset)))]
        error = np.abs(ref - (r + offset))

        if error < tol:
            TP += 1
            matchIdx.append(i)
            dev.append(error)
        else:
            if len(matchIdx) > 0:
                bdf = r - test[matchIdx[-1]]
                if bdf < minRR:
                    # false positive, but removable with RR interval check
                    pass
                else:
                    FP += 1
            else:
                FP += 1

    # convert deviations to time
    dev = np.array(dev, dtype='float')
    dev /= sampling_rate
    nd = len(dev)
    if nd == 0:
        mdev = np.nan
        sdev = np.nan
    elif nd == 1:
        mdev = np.mean(dev)
        sdev = 0.
    else:
        mdev = np.mean(dev)
        sdev = np.std(dev, ddof=1)

    # interbeat interval
    th1 = 1.5  # 40 bpm
    th2 = 0.3  # 200 bpm

    rIBI = np.diff(reference)
    rIBI = np.array(rIBI, dtype='float')
    rIBI /= sampling_rate

    good = np.nonzero((rIBI < th1) & (rIBI > th2))[0]
    rIBI = rIBI[good]

    nr = len(rIBI)
    if nr == 0:
        rIBIm = np.nan
        rIBIs = np.nan
    elif nr == 1:
        rIBIm = np.mean(rIBI)
        rIBIs = 0.
    else:
        rIBIm = np.mean(rIBI)
        rIBIs = np.std(rIBI, ddof=1)

    tIBI = np.diff(test[matchIdx])
    tIBI = np.array(tIBI, dtype='float')
    tIBI /= sampling_rate

    good = np.nonzero((tIBI < th1) & (tIBI > th2))[0]
    tIBI = tIBI[good]

    nt = len(tIBI)
    if nt == 0:
        tIBIm = np.nan
        tIBIs = np.nan
    elif nt == 1:
        tIBIm = np.mean(tIBI)
        tIBIs = 0.
    else:
        tIBIm = np.mean(tIBI)
        tIBIs = np.std(tIBI, ddof=1)

    # output
    perf = float(TP) / len(reference)
    acc = float(TP) / (TP + FP)
    err = float(FP) / (TP + FP)

    args = (TP, FP, perf, acc, err, matchIdx, dev, mdev, sdev, rIBIm, rIBIs,
            tIBIm, tIBIs)
    names = ('TP', 'FP', 'performance', 'acc', 'err', 'match', 'deviation',
             'mean_deviation', 'std_deviation', 'mean_ref_ibi', 'std_ref_ibi',
             'mean_test_ibi', 'std_test_ibi',)

    return utils.ReturnTuple(args, names)


def correct_rpeaks(signal=None, rpeaks=None, sampling_rate=1000., tol=0.05):
    """Correct R-peak locations to the maximum within a tolerance.

    Parameters
    ----------
    signal : array
        ECG signal.
    rpeaks : array
        R-peak location indices.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    tol : int, float, optional
        Correction tolerance (seconds).

    Returns
    -------
    rpeaks : array
        Cerrected R-peak location indices.

    Notes
    -----
    * The tolerance is defined as the time interval :math:`[R-tol, R+tol[`.

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    if rpeaks is None:
        raise TypeError("Please specify the input R-peaks.")

    tol = int(tol * sampling_rate)
    length = len(signal)

    newR = []
    for r in rpeaks:
        a = r - tol
        if a < 0:
            continue
        b = r + tol
        if b > length:
            break
        newR.append(a + np.argmax(signal[a:b]))

    newR = sorted(list(set(newR)))
    newR = np.array(newR, dtype='int')

    return utils.ReturnTuple((newR,), ('rpeaks',))


def ssf_segmenter(signal=None, sampling_rate=1000., threshold=20, before=0.03,
                  after=0.01):
    """ECG R-peak segmentation based on the Slope Sum Function (SSF).

    Parameters
    ----------
    signal : array
        Input filtered ECG signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    threshold : float, optional
        SSF threshold.
    before : float, optional
        Search window size before R-peak candidate (seconds).
    after : float, optional
        Search window size after R-peak candidate (seconds).

    Returns
    -------
    rpeaks : array
        R-peak location indices.

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    # convert to samples
    winB = int(before * sampling_rate)
    winA = int(after * sampling_rate)

    Rset = set()
    length = len(signal)

    # diff
    dx = np.diff(signal)
    dx[dx >= 0] = 0
    dx = dx ** 2

    # detection
    idx, = np.nonzero(dx > threshold)
    idx0 = np.hstack(([0], idx))
    didx = np.diff(idx0)

    # search
    sidx = idx[didx > 1]
    for item in sidx:
        a = item - winB
        if a < 0:
            a = 0
        b = item + winA
        if b > length:
            continue

        r = np.argmax(signal[a:b]) + a
        Rset.add(r)

    # output
    rpeaks = list(Rset)
    rpeaks.sort()
    rpeaks = np.array(rpeaks, dtype='int')

    return utils.ReturnTuple((rpeaks,), ('rpeaks',))


def christov_segmenter(signal=None, sampling_rate=1000.):
    """ECG R-peak segmentation algorithm.

    Follows the approach by Christov [Chri04]_.

    Parameters
    ----------
    signal : array
        Input filtered ECG signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).

    Returns
    -------
    rpeaks : array
        R-peak location indices.

    References
    ----------
    .. [Chri04] Ivaylo I. Christov, "Real time electrocardiogram QRS
       detection using combined adaptive threshold", BioMedical Engineering
       OnLine 2004, vol. 3:28, 2004

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    length = len(signal)

    # algorithm parameters
    v100ms = int(0.1 * sampling_rate)
    v50ms = int(0.050 * sampling_rate)
    v300ms = int(0.300 * sampling_rate)
    v350ms = int(0.350 * sampling_rate)
    v200ms = int(0.2 * sampling_rate)
    v1200ms = int(1.2 * sampling_rate)
    M_th = 0.4  # paper is 0.6

    # Pre-processing
    # 1. Moving averaging filter for power-line interference suppression:
    # averages samples in one period of the powerline
    # interference frequency with a first zero at this frequency.
    b = np.ones(int(0.02 * sampling_rate)) / 50.
    a = [1]
    X = ss.filtfilt(b, a, signal)
    # 2. Moving averaging of samples in 28 ms interval for electromyogram
    # noise suppression a filter with first zero at about 35 Hz.
    b = np.ones(int(sampling_rate / 35.)) / 35.
    X = ss.filtfilt(b, a, X)
    X, _, _ = st.filter_signal(signal=X,
                               ftype='butter',
                               band='lowpass',
                               order=7,
                               frequency=40.,
                               sampling_rate=sampling_rate)
    X, _, _ = st.filter_signal(signal=X,
                               ftype='butter',
                               band='highpass',
                               order=7,
                               frequency=9.,
                               sampling_rate=sampling_rate)

    k, Y, L = 1, [], len(X)
    for n in range(k + 1, L - k):
        Y.append(X[n] ** 2 - X[n - k] * X[n + k])
    Y = np.array(Y)
    Y[Y < 0] = 0

    # Complex lead
    # Y = abs(scipy.diff(X)) # 1-lead
    # 3. Moving averaging of a complex lead (the sintesis is
    # explained in the next section) in 40 ms intervals a filter
    # with first zero at about 25 Hz. It is suppressing the noise
    # magnified by the differentiation procedure used in the
    # process of the complex lead sintesis.
    b = np.ones(int(sampling_rate / 25.)) / 25.
    Y = ss.lfilter(b, a, Y)

    # Init
    MM = M_th * np.max(Y[:int(5 * sampling_rate)]) * np.ones(5)
    MMidx = 0
    M = np.mean(MM)
    slope = np.linspace(1.0, 0.6, int(sampling_rate))
    Rdec = 0
    R = 0
    RR = np.zeros(5)
    RRidx = 0
    Rm = 0
    QRS = []
    Rpeak = []
    current_sample = 0
    skip = False
    F = np.mean(Y[:v350ms])

    # Go through each sample
    while current_sample < len(Y):
        if QRS:
            # No detection is allowed 200 ms after the current one. In
            # the interval QRS to QRS+200ms a new value of M5 is calculated: newM5 = 0.6*max(Yi)
            if current_sample <= QRS[-1] + v200ms:
                Mnew = M_th * max(Y[QRS[-1]:QRS[-1] + v200ms])
                # The estimated newM5 value can become quite high, if
                # steep slope premature ventricular contraction or artifact
                # appeared, and for that reason it is limited to newM5 = 1.1*M5 if newM5 > 1.5* M5
                # The MM buffer is refreshed excluding the oldest component, and including M5 = newM5.
                Mnew = Mnew if Mnew <= 1.5 * MM[MMidx - 1] else 1.1 * MM[MMidx - 1]
                MM[MMidx] = Mnew
                MMidx = np.mod(MMidx + 1, 5)
                # M is calculated as an average value of MM.
                Mtemp = np.mean(MM)
                M = Mtemp
                skip = True
            # M is decreased in an interval 200 to 1200 ms following
            # the last QRS detection at a low slope, reaching 60 % of its
            # refreshed value at 1200 ms.
            elif current_sample >= QRS[-1] + v200ms and current_sample < QRS[-1] + v1200ms:
                M = Mtemp * slope[current_sample - QRS[-1] - v200ms]
            # After 1200 ms M remains unchanged.
            # R = 0 V in the interval from the last detected QRS to 2/3 of the expected Rm.
            if current_sample >= QRS[-1] and current_sample < QRS[-1] + (2 / 3.) * Rm:
                R = 0
            # In the interval QRS + Rm * 2/3 to QRS + Rm, R decreases
            # 1.4 times slower then the decrease of the previously discussed
            # steep slope threshold (M in the 200 to 1200 ms interval).
            elif current_sample >= QRS[-1] + (2 / 3.) * Rm and current_sample < QRS[-1] + Rm:
                R += Rdec
            # After QRS + Rm the decrease of R is stopped
            # MFR = M + F + R
        MFR = M + F + R
        # QRS or beat complex is detected if Yi = MFR
        if not skip and Y[current_sample] >= MFR:
            QRS += [current_sample]
            Rpeak += [QRS[-1] + np.argmax(Y[QRS[-1]:QRS[-1] + v300ms])]
            if len(QRS) >= 2:
                # A buffer with the 5 last RR intervals is updated at any new QRS detection.
                RR[RRidx] = QRS[-1] - QRS[-2]
                RRidx = np.mod(RRidx + 1, 5)
        skip = False
        # With every signal sample, F is updated adding the maximum
        # of Y in the latest 50 ms of the 350 ms interval and
        # subtracting maxY in the earliest 50 ms of the interval.
        if current_sample >= v350ms:
            Y_latest50 = Y[current_sample - v50ms:current_sample]
            Y_earliest50 = Y[current_sample - v350ms:current_sample - v300ms]
            F += (max(Y_latest50) - max(Y_earliest50)) / 1000.
        # Rm is the mean value of the buffer RR.
        Rm = np.mean(RR)
        current_sample += 1

    rpeaks = []
    for i in Rpeak:
        a, b = i - v100ms, i + v100ms
        if a < 0:
            a = 0
        if b > length:
            b = length
        rpeaks.append(np.argmax(signal[a:b]) + a)

    rpeaks = sorted(list(set(rpeaks)))
    rpeaks = np.array(rpeaks, dtype='int')

    return utils.ReturnTuple((rpeaks,), ('rpeaks',))


def engzee_segmenter(signal=None, sampling_rate=1000., threshold=0.48):
    """ECG R-peak segmentation algorithm.

    Follows the approach by Engelse and Zeelenberg [EnZe79]_ with the
    modifications by Lourenco *et al.* [LSLL12]_.

    Parameters
    ----------
    signal : array
        Input filtered ECG signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    threshold : float, optional
        Detection threshold.

    Returns
    -------
    rpeaks : array
        R-peak location indices.

    References
    ----------
    .. [EnZe79] W. Engelse and C. Zeelenberg, "A single scan algorithm for
       QRS detection and feature extraction", IEEE Comp. in Cardiology,
       vol. 6, pp. 37-42, 1979
    .. [LSLL12] A. Lourenco, H. Silva, P. Leite, R. Lourenco and A. Fred,
       "Real Time Electrocardiogram Segmentation for Finger Based ECG
       Biometrics", BIOSIGNALS 2012, pp. 49-54, 2012

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    # algorithm parameters
    changeM = int(0.75 * sampling_rate)
    Miterate = int(1.75 * sampling_rate)
    v250ms = int(0.25 * sampling_rate)
    v1200ms = int(1.2 * sampling_rate)
    v1500ms = int(1.5 * sampling_rate)
    v180ms = int(0.18 * sampling_rate)
    p10ms = int(np.ceil(0.01 * sampling_rate))
    p20ms = int(np.ceil(0.02 * sampling_rate))
    err_kill = int(0.01 * sampling_rate)
    inc = 1
    mmth = threshold
    mmp = 0.2

    # Differentiator (1)
    y1 = [signal[i] - signal[i - 4] for i in range(4, len(signal))]

    # Low pass filter (2)
    c = [1, 4, 6, 4, 1, -1, -4, -6, -4, -1]
    y2 = np.array([np.dot(c, y1[n - 9:n + 1]) for n in range(9, len(y1))])
    y2_len = len(y2)

    # vars
    MM = mmth * max(y2[:Miterate]) * np.ones(3)
    MMidx = 0
    Th = np.mean(MM)
    NN = mmp * min(y2[:Miterate]) * np.ones(2)
    NNidx = 0
    ThNew = np.mean(NN)
    update = False
    nthfpluss = []
    rpeaks = []

    # Find nthf+ point
    while True:
        # If a previous intersection was found, continue the analysis from there
        if update:
            if inc * changeM + Miterate < y2_len:
                a = (inc - 1) * changeM
                b = inc * changeM + Miterate
                Mnew = mmth * max(y2[a:b])
                Nnew = mmp * min(y2[a:b])
            elif y2_len - (inc - 1) * changeM > v1500ms:
                a = (inc - 1) * changeM
                Mnew = mmth * max(y2[a:])
                Nnew = mmp * min(y2[a:])
            if len(y2) - inc * changeM > Miterate:
                MM[MMidx] = Mnew if Mnew <= 1.5 * MM[MMidx - 1] else 1.1 * MM[MMidx - 1]
                NN[NNidx] = Nnew if abs(Nnew) <= 1.5 * abs(NN[NNidx - 1]) else 1.1 * NN[NNidx - 1]
            MMidx = np.mod(MMidx + 1, len(MM))
            NNidx = np.mod(NNidx + 1, len(NN))
            Th = np.mean(MM)
            ThNew = np.mean(NN)
            inc += 1
            update = False
        if nthfpluss:
            lastp = nthfpluss[-1] + 1
            if lastp < (inc - 1) * changeM:
                lastp = (inc - 1) * changeM
            y22 = y2[lastp:inc * changeM + err_kill]
            # find intersection with Th
            try:
                nthfplus = np.intersect1d(np.nonzero(y22 > Th)[0], np.nonzero(y22 < Th)[0] - 1)[0]
            except IndexError:
                if inc * changeM > len(y2):
                    break
                else:
                    update = True
                    continue
            # adjust index
            nthfplus += int(lastp)
            # if a previous R peak was found:
            if rpeaks:
                # check if intersection is within the 200-1200 ms interval. Modification: 300 ms -> 200 bpm
                if nthfplus - rpeaks[-1] > v250ms and nthfplus - rpeaks[-1] < v1200ms:
                    pass
                # if new intersection is within the <200ms interval, skip it. Modification: 300 ms -> 200 bpm
                elif nthfplus - rpeaks[-1] < v250ms:
                    nthfpluss += [nthfplus]
                    continue
        # no previous intersection, find the first one
        else:
            try:
                aux = np.nonzero(y2[(inc - 1) * changeM:inc * changeM + err_kill] > Th)[0]
                bux = np.nonzero(y2[(inc - 1) * changeM:inc * changeM + err_kill] < Th)[0] - 1
                nthfplus = int((inc - 1) * changeM) + np.intersect1d(aux, bux)[0]
            except IndexError:
                if inc * changeM > len(y2):
                    break
                else:
                    update = True
                    continue
        nthfpluss += [nthfplus]
        # Define 160ms search region
        windowW = np.arange(nthfplus, nthfplus + v180ms)
        # Check if the condition y2[n] < Th holds for a specified
        # number of consecutive points (experimentally we found this number to be at least 10 points)"
        i, f = windowW[0], windowW[-1] if windowW[-1] < len(y2) else -1
        hold_points = np.diff(np.nonzero(y2[i:f] < ThNew)[0])
        cont = 0
        for hp in hold_points:
            if hp == 1:
                cont += 1
                if cont == p10ms - 1:  # -1 is because diff eats a sample
                    max_shift = p20ms  # looks for X's max a bit to the right
                    if nthfpluss[-1] > max_shift:
                        rpeaks += [np.argmax(signal[i - max_shift:f]) + i - max_shift]
                    else:
                        rpeaks += [np.argmax(signal[i:f]) + i]
                    break
            else:
                cont = 0

    rpeaks = sorted(list(set(rpeaks)))
    rpeaks = np.array(rpeaks, dtype='int')

    return utils.ReturnTuple((rpeaks,), ('rpeaks',))


def gamboa_segmenter(signal=None, sampling_rate=1000., tol=0.002):
    """ECG R-peak segmentation algorithm.

    Follows the approach by Gamboa.

    Parameters
    ----------
    signal : array
        Input filtered ECG signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).
    tol : float, optional
        Tolerance parameter.

    Returns
    -------
    rpeaks : array
        R-peak location indices.

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    # convert to samples
    v_100ms = int(0.1 * sampling_rate)
    v_300ms = int(0.3 * sampling_rate)
    hist, edges = np.histogram(signal, 100, density=True)

    TH = 0.01
    F = np.cumsum(hist)

    v0 = edges[np.nonzero(F > TH)[0][0]]
    v1 = edges[np.nonzero(F < (1 - TH))[0][-1]]

    nrm = max([abs(v0), abs(v1)])
    norm_signal = signal / float(nrm)

    d2 = np.diff(norm_signal, 2)

    b = np.nonzero((np.diff(np.sign(np.diff(-d2)))) == -2)[0] + 2
    b = np.intersect1d(b, np.nonzero(-d2 > tol)[0])

    if len(b) < 3:
        rpeaks = []
    else:
        b = b.astype('float')
        rpeaks = []
        previous = b[0]
        for i in b[1:]:
            if i - previous > v_300ms:
                previous = i
                rpeaks.append(np.argmax(signal[int(i):int(i + v_100ms)]) + i)

    rpeaks = sorted(list(set(rpeaks)))
    rpeaks = np.array(rpeaks, dtype='int')

    return utils.ReturnTuple((rpeaks,), ('rpeaks',))


def hamilton_segmenter(signal=None, sampling_rate=1000.):
    """ECG R-peak segmentation algorithm.

    Follows the approach by Hamilton [Hami02]_.

    Parameters
    ----------
    signal : array
        Input filtered ECG signal.
    sampling_rate : int, float, optional
        Sampling frequency (Hz).

    Returns
    -------
    rpeaks : array
        R-peak location indices.

    References
    ----------
    .. [Hami02] P.S. Hamilton, "Open Source ECG Analysis Software
       Documentation", E.P.Limited, 2002

    """

    # check inputs
    if signal is None:
        raise TypeError("Please specify an input signal.")

    sampling_rate = float(sampling_rate)
    length = len(signal)
    dur = length / sampling_rate

    # algorithm parameters
    v1s = int(1. * sampling_rate)
    v100ms = int(0.1 * sampling_rate)
    TH_elapsed = np.ceil(0.36 * sampling_rate)
    sm_size = int(0.08 * sampling_rate)
    init_ecg = 8  # seconds for initialization
    if dur < init_ecg:
        init_ecg = int(dur)

    # filtering
    filtered, _, _ = st.filter_signal(signal=signal,
                                      ftype='butter',
                                      band='lowpass',
                                      order=4,
                                      frequency=25.,
                                      sampling_rate=sampling_rate)
    filtered, _, _ = st.filter_signal(signal=filtered,
                                      ftype='butter',
                                      band='highpass',
                                      order=4,
                                      frequency=3.,
                                      sampling_rate=sampling_rate)

    # diff
    dx = np.abs(np.diff(filtered, 1) * sampling_rate)

    # smoothing
    dx, _ = st.smoother(signal=dx, kernel='hamming', size=sm_size, mirror=True)

    # buffers
    qrspeakbuffer = np.zeros(init_ecg)
    noisepeakbuffer = np.zeros(init_ecg)
    peak_idx_test = np.zeros(init_ecg)
    noise_idx = np.zeros(init_ecg)
    rrinterval = sampling_rate * np.ones(init_ecg)

    a, b = 0, v1s
    all_peaks, _ = st.find_extrema(signal=dx, mode='max')
    for i in range(init_ecg):
        peaks, values = st.find_extrema(signal=dx[a:b], mode='max')
        try:
            ind = np.argmax(values)
        except ValueError:
            pass
        else:
            # peak amplitude
            qrspeakbuffer[i] = values[ind]
            # peak location
            peak_idx_test[i] = peaks[ind] + a

        a += v1s
        b += v1s

    # thresholds
    ANP = np.median(noisepeakbuffer)
    AQRSP = np.median(qrspeakbuffer)
    TH = 0.475
    DT = ANP + TH * (AQRSP - ANP)
    DT_vec = []
    indexqrs = 0
    indexnoise = 0
    indexrr = 0
    npeaks = 0
    offset = 0

    beats = []

    # detection rules
    # 1 - ignore all peaks that precede or follow larger peaks by less than 200ms
    lim = int(np.ceil(0.2 * sampling_rate))
    diff_nr = int(np.ceil(0.045 * sampling_rate))
    bpsi, bpe = offset, 0

    for f in all_peaks:
        DT_vec += [DT]
        # 1 - Checking if f-peak is larger than any peak following or preceding it by less than 200 ms
        peak_cond = np.array((all_peaks > f - lim) * (all_peaks < f + lim) * (all_peaks != f))
        peaks_within = all_peaks[peak_cond]
        if peaks_within.any() and (max(dx[peaks_within]) > dx[f]):
            continue

        # 4 - If the peak is larger than the detection threshold call it a QRS complex, otherwise call it noise
        if dx[f] > DT:
            # 2 - look for both positive and negative slopes in raw signal
            if f < diff_nr:
                diff_now = np.diff(signal[0:f + diff_nr])
            elif f + diff_nr >= len(signal):
                diff_now = np.diff(signal[f - diff_nr:len(dx)])
            else:
                diff_now = np.diff(signal[f - diff_nr:f + diff_nr])
            diff_signer = diff_now[diff_now > 0]
            if len(diff_signer) == 0 or len(diff_signer) == len(diff_now):
                continue
            # RR INTERVALS
            if npeaks > 0:
                # 3 - in here we check point 3 of the Hamilton paper
                # that is, we check whether our current peak is a valid R-peak.
                prev_rpeak = beats[npeaks - 1]

                elapsed = f - prev_rpeak
                # if the previous peak was within 360 ms interval
                if elapsed < TH_elapsed:
                    # check current and previous slopes
                    if prev_rpeak < diff_nr:
                        diff_prev = np.diff(signal[0:prev_rpeak + diff_nr])
                    elif prev_rpeak + diff_nr >= len(signal):
                        diff_prev = np.diff(signal[prev_rpeak - diff_nr:len(dx)])
                    else:
                        diff_prev = np.diff(signal[prev_rpeak - diff_nr:prev_rpeak + diff_nr])

                    slope_now = max(diff_now)
                    slope_prev = max(diff_prev)

                    if (slope_now < 0.5 * slope_prev):
                        # if current slope is smaller than half the previous one, then it is a T-wave
                        continue
                if dx[f] < 3. * np.median(qrspeakbuffer):  # avoid retarded noise peaks
                    beats += [int(f) + bpsi]
                else:
                    continue

                if bpe == 0:
                    rrinterval[indexrr] = beats[npeaks] - beats[npeaks - 1]
                    indexrr += 1
                    if indexrr == init_ecg:
                        indexrr = 0
                else:
                    if beats[npeaks] > beats[bpe - 1] + v100ms:
                        rrinterval[indexrr] = beats[npeaks] - beats[npeaks - 1]
                        indexrr += 1
                        if indexrr == init_ecg:
                            indexrr = 0

            elif dx[f] < 3. * np.median(qrspeakbuffer):
                beats += [int(f) + bpsi]
            else:
                continue

            npeaks += 1
            qrspeakbuffer[indexqrs] = dx[f]
            peak_idx_test[indexqrs] = f
            indexqrs += 1
            if indexqrs == init_ecg:
                indexqrs = 0
        if dx[f] <= DT:
            # 4 - not valid
            # 5 - If no QRS has been detected within 1.5 R-to-R intervals,
            # there was a peak that was larger than half the detection threshold,
            # and the peak followed the preceding detection by at least 360 ms,
            # classify that peak as a QRS complex
            tf = f + bpsi
            # RR interval median
            RRM = np.median(rrinterval)  # initial values are good?

            if len(beats) >= 2:
                elapsed = tf - beats[npeaks - 1]

                if elapsed >= 1.5 * RRM and elapsed > TH_elapsed:
                    if dx[f] > 0.5 * DT:
                        beats += [int(f) + offset]
                        # RR INTERVALS
                        if npeaks > 0:
                            rrinterval[indexrr] = beats[npeaks] - beats[npeaks - 1]
                            indexrr += 1
                            if indexrr == init_ecg:
                                indexrr = 0
                        npeaks += 1
                        qrspeakbuffer[indexqrs] = dx[f]
                        peak_idx_test[indexqrs] = f
                        indexqrs += 1
                        if indexqrs == init_ecg:
                            indexqrs = 0
                else:
                    noisepeakbuffer[indexnoise] = dx[f]
                    noise_idx[indexnoise] = f
                    indexnoise += 1
                    if indexnoise == init_ecg:
                        indexnoise = 0
            else:
                noisepeakbuffer[indexnoise] = dx[f]
                noise_idx[indexnoise] = f
                indexnoise += 1
                if indexnoise == init_ecg:
                    indexnoise = 0

        # Update Detection Threshold
        ANP = np.median(noisepeakbuffer)
        AQRSP = np.median(qrspeakbuffer)
        DT = ANP + 0.475 * (AQRSP - ANP)

    beats = np.array(beats)

    r_beats = []
    thres_ch = 0.85
    adjacency = 0.05 * sampling_rate
    for i in beats:
        error = [False, False]
        if i - lim < 0:
            window = signal[0:i + lim]
            add = 0
        elif i + lim >= length:
            window = signal[i - lim:length]
            add = i - lim
        else:
            window = signal[i - lim:i + lim]
            add = i - lim
        # meanval = np.mean(window)
        w_peaks, _ = st.find_extrema(signal=window, mode='max')
        w_negpeaks, _ = st.find_extrema(signal=window, mode='min')
        zerdiffs = np.where(np.diff(window) == 0)[0]
        w_peaks = np.concatenate((w_peaks, zerdiffs))
        w_negpeaks = np.concatenate((w_negpeaks, zerdiffs))

        pospeaks = sorted(zip(window[w_peaks], w_peaks), reverse=True)
        negpeaks = sorted(zip(window[w_negpeaks], w_negpeaks))

        try:
            twopeaks = [pospeaks[0]]
        except IndexError:
            twopeaks = []
        try:
            twonegpeaks = [negpeaks[0]]
        except IndexError:
            twonegpeaks = []

        # getting positive peaks
        for i in range(len(pospeaks) - 1):
            if abs(pospeaks[0][1] - pospeaks[i + 1][1]) > adjacency:
                twopeaks.append(pospeaks[i + 1])
                break
        try:
            posdiv = abs(twopeaks[0][0] - twopeaks[1][0])
        except IndexError:
            error[0] = True

        # getting negative peaks
        for i in range(len(negpeaks) - 1):
            if abs(negpeaks[0][1] - negpeaks[i + 1][1]) > adjacency:
                twonegpeaks.append(negpeaks[i + 1])
                break
        try:
            negdiv = abs(twonegpeaks[0][0] - twonegpeaks[1][0])
        except IndexError:
            error[1] = True

        # choosing type of R-peak
        n_errors = sum(error)
        try:
            if not n_errors:
                if posdiv > thres_ch * negdiv:
                    # pos noerr
                    r_beats.append(twopeaks[0][1] + add)
                else:
                    # neg noerr
                    r_beats.append(twonegpeaks[0][1] + add)
            elif n_errors == 2:
                if abs(twopeaks[0][1]) > abs(twonegpeaks[0][1]):
                    # pos allerr
                    r_beats.append(twopeaks[0][1] + add)
                else:
                    # neg allerr
                    r_beats.append(twonegpeaks[0][1] + add)
            elif error[0]:
                # pos poserr
                r_beats.append(twopeaks[0][1] + add)
            else:
                # neg negerr
                r_beats.append(twonegpeaks[0][1] + add)
        except IndexError:
            continue

    rpeaks = sorted(list(set(r_beats)))
    rpeaks = np.array(rpeaks, dtype='int')

    return utils.ReturnTuple((rpeaks,), ('rpeaks',))

In [None]:
def extract_rri(signal, ir, CHUNK_DURATION):
    tm = np.arange(0, CHUNK_DURATION, step=1 / float(ir))  # TIME METRIC FOR INTERPOLATION

    filtered, _, _ = st.filter_signal(signal=signal, ftype="FIR", band="bandpass", order=int(0.3 * CHAT_FREQ),
                                      frequency=[3, 45], sampling_rate=CHAT_FREQ)
    (rpeaks,) = hamilton_segmenter(signal=filtered, sampling_rate=CHAT_FREQ)
    (rpeaks,) = correct_rpeaks(signal=filtered, rpeaks=rpeaks, sampling_rate=CHAT_FREQ, tol=0.05)

    if 4 < len(rpeaks) < 200:  # and np.max(signal) < 0.0015 and np.min(signal) > -0.0015:
        rri_tm, rri_signal = rpeaks[1:] / float(CHAT_FREQ), np.diff(rpeaks) / float(CHAT_FREQ)
        ampl_tm, ampl_signal = rpeaks / float(CHAT_FREQ), signal[rpeaks]
        rri_interp_signal = splev(tm, splrep(rri_tm, rri_signal, k=3), ext=1)
        amp_interp_signal = splev(tm, splrep(ampl_tm, ampl_signal, k=3), ext=1)

        return np.clip(rri_interp_signal, 0, 2) * 100, np.clip(amp_interp_signal, -0.001, 0.002) * 10000, True
    else:
        return np.zeros((CHAT_FREQ * CHAT_EPOCH_LENGTH)), np.zeros((CHAT_FREQ * CHAT_EPOCH_LENGTH)), False


def load_data(path):
    # demo = pd.read_csv("../misc/result.csv")
    # ahi = pd.read_csv(r"C:\Data\AHI.csv")
    # ahi_dict = dict(zip(ahi.Study, ahi.AHI))
    root_dir = os.path.expanduser(path)
    file_list = os.listdir(root_dir)
    length = len(file_list)

    ################################### Fold the data based on number of respiratory events #########################
    study_event_counts = [i for i in range(0, length)]
    folds = []
    for i in range(5):
        folds.append(study_event_counts[i::5])

    x = []
    y_apnea = []
    y_hypopnea = []
    counter = 0
    for idx, fold in enumerate(folds):
        first = True
        for patient in fold:
            rri_succ_counter = 0
            rri_fail_counter = 0
            counter += 1
            print(counter)
            # for study in glob.glob(PATH + patient[0] + "_*"):
            study_data = np.load(CHAT_PREPROCESSED_PATH + "\\" + file_list[patient - 1])

            signals = study_data['data']
            labels_apnea = study_data['labels_apnea']
            labels_hypopnea = study_data['labels_hypopnea']

            # identifier = study.split('\\')[-1].split('_')[0] + "_" + study.split('\\')[-1].split('_')[1]
            # demo_arr = demo[demo['id'] == identifier].drop(columns=['id']).to_numpy().squeeze()

            y_c = labels_apnea + labels_hypopnea
            neg_samples = np.where(y_c == 0)[0]
            pos_samples = list(np.where(y_c > 0)[0])
            ratio = len(pos_samples) / len(neg_samples)
            neg_survived = []
            for s in range(len(neg_samples)):
                if random.random() < ratio:
                    neg_survived.append(neg_samples[s])
            samples = neg_survived + pos_samples
            signals = signals[samples, :, :]
            labels_apnea = labels_apnea[samples]
            labels_hypopnea = labels_hypopnea[samples]

            data = np.zeros((signals.shape[0], CHAT_EPOCH_LENGTH * CHAT_FREQ, chat_s_count + 2))
            for i in range(signals.shape[0]):  # for each epoch
                # data[i, :len(demo_arr), -3] = demo_arr
                data[i, :, -1], data[i, :, -2], status = extract_rri(signals[i, CHAT_ECG_SIG, :], CHAT_FREQ,
                                                                     float(CHAT_EPOCH_LENGTH))

                if status:
                    rri_succ_counter += 1
                else:
                    rri_fail_counter += 1

                for j in range(chat_s_count):  # for each signal
                    data[i, :, j] = signals[i, CHAT_SIGS[j], :]

            if first:
                aggregated_data = data
                aggregated_label_apnea = labels_apnea
                aggregated_label_hypopnea = labels_hypopnea
                first = False
            else:
                aggregated_data = np.concatenate((aggregated_data, data), axis=0)
                aggregated_label_apnea = np.concatenate((aggregated_label_apnea, labels_apnea), axis=0)
                aggregated_label_hypopnea = np.concatenate((aggregated_label_hypopnea, labels_hypopnea), axis=0)
            print(rri_succ_counter, rri_fail_counter)

        x.append(aggregated_data)
        y_apnea.append(aggregated_label_apnea)
        y_hypopnea.append(aggregated_label_hypopnea)

    return x, y_apnea, y_hypopnea

In [None]:
x, y_apnea, y_hypopnea = load_data(CHAT_PREPROCESSED_PATH)

1
88 0
2
93 0
3
201 0
4
51 0
5
45 0
6
66 0
7
77 0
8
150 0
9
383 0
10
85 0
11
205 0
12
259 0
13
142 0


In [None]:
for i in range(5):
        print(x[i].shape, y_apnea[i].shape, y_hypopnea[i].shape)
        np.savez_compressed(CHAT_OUT_PATH + "_" + str(i), x=x[i], y_apnea=y_apnea[i], y_hypopnea=y_hypopnea[i])

(382, 3840, 17) (382,) (382,)
(162, 3840, 17) (162,) (162,)
(610, 3840, 17) (610,) (610,)
(290, 3840, 17) (290,) (290,)
(401, 3840, 17) (401,) (401,)
