# Data modification test

In [1]:
import numpy as np
import matplotlib.pyplot as plt

import mne
from mne.datasets.sleep_physionet.age import fetch_data

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import FunctionTransformer

In [2]:
ALICE, BOB = 0, 1

[alice_files, bob_files] = fetch_data(subjects=[ALICE, BOB], recording=[1])

raw_train = mne.io.read_raw_edf(alice_files[0], stim_channel='Event marker',
                                misc=['Temp rectal'], preload=True)
annot_train = mne.read_annotations(alice_files[1])

raw_train.set_annotations(annot_train, emit_warning=False)

# raw_train.plot(start=60, duration=60,
#                scalings=dict(eeg=1e-4, resp=1e3, eog=1e-4, emg=1e-7,
#                              misc=1e-1))

Using default location ~/mne_data for PHYSIONET_SLEEP...
Extracting EDF parameters from /home/anrath/mne_data/physionet-sleep-data/SC4001E0-PSG.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 7949999  =      0.000 ... 79499.990 secs...


0,1
Measurement date,"April 24, 1989 16:13:00 GMT"
Experimenter,Unknown
Digitized points,Not available
Good channels,"5 EEG, 1 misc, 1 Stimulus"
Bad channels,
EOG channels,Not available
ECG channels,Not available
Sampling frequency,100.00 Hz
Highpass,0.50 Hz
Lowpass,100.00 Hz


In [3]:
# raw_copy = raw_train.copy()
# func_data = raw_copy.apply_function(np.abs)
# func_data.get_data()

In [4]:
import sys
sys.path.append('../')

In [5]:
from peak_finder import PeakFinder as pf

t = np.arange(0, 3, 0.01)
signal_sin = np.sin(np.pi*t) - np.sin(0.5*np.pi*t)
mne_sin_peak_locs, mne_sin_peak_mags = mne.preprocessing.peak_finder(signal_sin) 

signal_eeg = raw_train.get_data()[0]
format_percent = lambda x, y: np.round(len(x)/len(y), 4)
mne_eeg_peak_locs, mne_eeg_peak_mags = mne.preprocessing.peak_finder(raw_train.get_data()[0])

def success_metrics(results, signal='eeg', string=""):
    if signal == 'eeg':
        signal = signal_eeg
        mne_peak_locs = mne_eeg_peak_locs
    elif signal == 'sin':
        signal = signal_sin
        mne_peak_locs = mne_sin_peak_locs

    common_peaks = np.intersect1d(results, mne_peak_locs)
    common_peaks_len = len(common_peaks)

    results_len = len(results)
    peak_to_signal_ratio = format_percent(results, signal)

    actual_to_predicted_peak_count_ratio = format_percent(results, mne_peak_locs)

    print(string + f"Peaks: {results_len} ({peak_to_signal_ratio}), Intersect Num: {common_peaks_len} ({actual_to_predicted_peak_count_ratio})")
    return None


Found 2 significant peaks
Found 29454 significant peaks


In [6]:
minimum_height = 4e-5
edges = ['rising', 'falling', 'both', None]
success_metrics(mne_eeg_peak_locs, signal='eeg', string="MNE: ")
for edge in edges:
    ind_eeg_peak_typing_finder = pf.peak_typing_finder(signal_eeg, minimum_height=minimum_height, minimum_distance=1, edge=edge)
    success_metrics(ind_eeg_peak_typing_finder, signal='eeg', string=f"Edge: {edge}, ")

print('\n')

success_metrics(mne_sin_peak_locs, signal='sin', string="MNE: ")
for edge in edges:
    ind_sin_peak_typing_finder = pf.peak_typing_finder(signal_sin, minimum_height=minimum_height, minimum_distance=1, edge=edge)
    success_metrics(ind_sin_peak_typing_finder, signal='sin', string=f"Edge: {edge}, ")

MNE: Peaks: 29454 (0.0037), Intersect Num: 29454 (1.0)
Edge: rising, Peaks: 161038 (0.0203), Intersect Num: 25484 (5.4674)
Edge: falling, Peaks: 161118 (0.0203), Intersect Num: 25323 (5.4702)
Edge: both, Peaks: 162401 (0.0204), Intersect Num: 25484 (5.5137)
Edge: None, Peaks: 159755 (0.0201), Intersect Num: 25323 (5.4239)


MNE: Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Edge: rising, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Edge: falling, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Edge: both, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Edge: None, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)


In [7]:
peaks_eeg = {}
success_metrics(mne_eeg_peak_locs, signal='eeg', string="MNE: ")
distances = [15, 35, 50, 100, 155]
for distance in distances:
    peaks_eeg[distance] = pf.naive_logical_find_peaks(signal_eeg, min_distance=distance)
    success_metrics(peaks_eeg[distance], signal='eeg', string=f"Distance: {distance}, ")

print('\n')

peaks_sin = {}
success_metrics(mne_sin_peak_locs, signal='sin', string="MNE: ")
for distance in distances:
    peaks_sin[distance] = pf.naive_logical_find_peaks(signal_sin, min_distance=distance)
    success_metrics(peaks_sin[distance], signal='sin', string=f"Distance: {distance}, ")

MNE: Peaks: 29454 (0.0037), Intersect Num: 29454 (1.0)
Distance: 15, Peaks: 188047 (0.0237), Intersect Num: 28906 (6.3844)
Distance: 35, Peaks: 87758 (0.011), Intersect Num: 27407 (2.9795)
Distance: 50, Peaks: 63595 (0.008), Intersect Num: 25730 (2.1591)
Distance: 100, Peaks: 34871 (0.0044), Intersect Num: 21150 (1.1839)
Distance: 155, Peaks: 26413 (0.0033), Intersect Num: 18193 (0.8968)


MNE: Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Distance: 15, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Distance: 35, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Distance: 50, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Distance: 100, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Distance: 155, Peaks: 2 (0.0067), Intersect Num: 2 (1.0)


In [8]:
success_metrics(mne_eeg_peak_locs, signal='eeg', string="MNE: ")
ind_eeg_naive_mathematical_find_peaks = pf.naive_mathematical_find_peaks(signal_eeg)
success_metrics(ind_eeg_naive_mathematical_find_peaks, signal='eeg')

print('\n')

success_metrics(mne_sin_peak_locs, signal='sin', string="MNE: ")
ind_sin_naive_mathematical_find_peaks = pf.naive_mathematical_find_peaks(signal_sin)
success_metrics(ind_sin_naive_mathematical_find_peaks, signal='sin')

MNE: Peaks: 29454 (0.0037), Intersect Num: 29454 (1.0)
Peaks: 2128087 (0.2677), Intersect Num: 29070 (72.2512)


MNE: Peaks: 2 (0.0067), Intersect Num: 2 (1.0)
Peaks: 2 (0.0067), Intersect Num: 1 (1.0)


In [9]:
def noisy_find_peaks(x0, thresh=None, extrema=1, verbose=None):
    """Noise-tolerant fast peak-finding algorithm.

    Parameters
    ----------
    x0 : 1d array
        A real vector from the maxima will be found (required).
    thresh : float | None
        The amount above surrounding data for a peak to be
        identified. Larger values mean the algorithm is more selective in
        finding peaks. If ``None``, use the default of
        ``(max(x0) - min(x0)) / 4``.
    extrema : {-1, 1}
        1 if maxima are desired, -1 if minima are desired
        (default = maxima, 1).
    %(verbose)s

    Returns
    -------
    peak_loc : array
        The indices of the identified peaks in x0.
    peak_mag : array
        The magnitude of the identified peaks.

    """
    x0 = np.asanyarray(x0)
    s = x0.size

    if x0.ndim >= 2 or s == 0:
        raise ValueError('The input data must be a non empty 1D vector')

    if thresh is None:
        thresh = (np.max(x0) - np.min(x0)) / 4
        logger.debug('Peak finder automatic threshold: %0.2g' % (thresh,))

    assert extrema in [-1, 1]

    if extrema == -1:
        x0 = extrema * x0  # Make it so we are finding maxima regardless

    dx0 = np.diff(x0)  # Find derivative
    # This is so we find the first of repeated values
    dx0[dx0 == 0] = -np.finfo(float).eps
    # Find where the derivative changes sign
    ind = np.where(dx0[:-1:] * dx0[1::] < 0)[0] + 1

    # Include endpoints in potential peaks and valleys
    x = np.concatenate((x0[:1], x0[ind], x0[-1:]))
    ind = np.concatenate(([0], ind, [s - 1]))
    del x0

    #  x only has the peaks, valleys, and endpoints
    length = x.size
    min_mag = np.min(x)

    if length > 2:  # Function with peaks and valleys

        # Set initial parameters for loop
        temp_mag = min_mag
        found_peak = False
        left_min = min_mag

        # Deal with first point a little differently since tacked it on
        # Calculate the sign of the derivative since we taked the first point
        # on it does not necessarily alternate like the rest.
        signDx = np.sign(np.diff(x[:3]))
        if signDx[0] <= 0:  # The first point is larger or equal to the second
            ii = -1
            if signDx[0] == signDx[1]:  # Want alternating signs
                x = np.concatenate((x[:1], x[2:]))
                ind = np.concatenate((ind[:1], ind[2:]))
                length -= 1

        else:  # First point is smaller than the second
            ii = 0
            if signDx[0] == signDx[1]:  # Want alternating signs
                x = x[1:]
                ind = ind[1:]
                length -= 1

        # Preallocate max number of maxima
        maxPeaks = int(np.ceil(length / 2.0))
        peak_loc = np.zeros(maxPeaks, dtype=np.int64)
        peak_mag = np.zeros(maxPeaks)
        c_ind = 0
        # Loop through extrema which should be peaks and then valleys
        while ii < (length - 1):
            ii += 1  # This is a peak
            # Reset peak finding if we had a peak and the next peak is bigger
            # than the last or the left min was small enough to reset.
            if found_peak and ((x[ii] > peak_mag[-1]) or
                               (left_min < peak_mag[-1] - thresh)):
                temp_mag = min_mag
                found_peak = False

            # Make sure we don't iterate past the length of our vector
            if ii == length - 1:
                break  # We assign the last point differently out of the loop

            # Found new peak that was lager than temp mag and threshold larger
            # than the minimum to its left.
            if (x[ii] > temp_mag) and (x[ii] > left_min + thresh):
                temp_loc = ii
                temp_mag = x[ii]

            ii += 1  # Move onto the valley
            # Come down at least thresh from peak
            if not found_peak and (temp_mag > (thresh + x[ii])):
                found_peak = True  # We have found a peak
                left_min = x[ii]
                peak_loc[c_ind] = temp_loc  # Add peak to index
                peak_mag[c_ind] = temp_mag
                c_ind += 1
            elif x[ii] < left_min:  # New left minima
                left_min = x[ii]

        # Check end point
        if (x[-1] > temp_mag) and (x[-1] > (left_min + thresh)):
            peak_loc[c_ind] = length - 1
            peak_mag[c_ind] = x[-1]
            c_ind += 1
        elif not found_peak and temp_mag > min_mag:
            # Check if we still need to add the last point
            peak_loc[c_ind] = temp_loc
            peak_mag[c_ind] = temp_mag
            c_ind += 1

        # Create output
        peak_inds = ind[peak_loc[:c_ind]]
        peak_mags = peak_mag[:c_ind]
    else:  # This is a monotone function where an endpoint is the only peak
        x_ind = np.argmax(x)
        peak_mags = x[x_ind]
        if peak_mags > (min_mag + thresh):
            peak_inds = ind[x_ind]
        else:
            peak_mags = []
            peak_inds = []

    # Change sign of data if was finding minima
    if extrema < 0:
        peak_mags *= -1.0

    # ensure output type array
    if not isinstance(peak_inds, np.ndarray):
        peak_inds = np.atleast_1d(peak_inds).astype('int64')

    if not isinstance(peak_mags, np.ndarray):
        peak_mags = np.atleast_1d(peak_mags).astype('float64')

    # Plot if no output desired
    if len(peak_inds) == 0:
        logger.info('No significant peaks found')
    else:
        logger.info('Found %d significant peak%s'
                    % (len(peak_inds), _pl(peak_inds)))

    return peak_inds, peak_mags