In [2]:
from mne.time_frequency import psd_array_multitaper


def sliding_window(data, size, stepsize=1, padded=False, axis=-1, copy=True):
    """
    Calculate a sliding window over a signal
    Parameters
    ----------
    data : numpy array
        The array to be slided over.
    size : int
        The sliding window size
    stepsize : int
        The sliding window stepsize. Defaults to 1.
    axis : int
        The axis to slide over. Defaults to the last axis.
    copy : bool
        Return strided array as copy to avoid sideffects when manipulating the
        output array.
    Returns
    -------
    data : numpy array
        A matrix where row in last dimension consists of one instance
        of the sliding window.
    Notes
    -----
    - Be wary of setting `copy` to `False` as undesired sideffects with the
      output values may occurr.
    Examples
    --------
    >>> a = numpy.array([1, 2, 3, 4, 5])
    >>> sliding_window(a, size=3)
    array([[1, 2, 3],
           [2, 3, 4],
           [3, 4, 5]])
    >>> sliding_window(a, size=3, stepsize=2)
    array([[1, 2, 3],
           [3, 4, 5]])
    See Also
    --------
    pieces : Calculate number of pieces available by sliding
    """
    modul = (data.shape[0] - size) % stepsize

    if padded == True:
        if  modul!= 0:
            data_append = np.array(list(repeat(0,modul)))
            data = np.concatenate((data,data_append),axis = None)
    #print(data)
    if axis >= data.ndim:
        raise ValueError(
            "Axis value out of range"
        )

    if stepsize < 1:
        raise ValueError(
            "Stepsize may not be zero or negative"
        )

    if size > data.shape[axis]:
        raise ValueError(
            "Sliding window size may not exceed size of selected axis"
        )

    shape = list(data.shape)
    shape[axis] = np.floor(data.shape[axis] / stepsize - size / stepsize + 1).astype(int)
    shape.append(size)


    strides = list(data.strides)
    strides[axis] *= stepsize
    strides.append(data.strides[axis])

    #print(strides)
    strided = np.lib.stride_tricks.as_strided(
        data, shape=shape, strides=strides
    )

    if copy:
        return strided.copy()
    else:
        return strided

        
def multitaper_psd(data_frame, sfreq, rms_interval = 1, step_size = 0.5, bandwidth = 4, freq_bands = [1,16], iEEGPCA = False):
    """
    Calculate multitaper power spectrum density. You need sliding_window function too.
    Parameters:
        data_frame: time series data frame (n_channel * n_times)
        sfreq: sampling frequency in Hz
        rms_interval: The time window size in s
        bandwidth: the bandwith for Multitaper. Bandwidth = s*f
        step_size: the step size for the moving window in s. THis parameter determines the frequency for the result.
        freq_bands: frequency band you are interested
        iEEGPCA: whether to PCA the data_frame.
    Output:
        bpvals_low: data frame containing the power spectrum estimate in shape (n_channel, n_times, n_freq)
        freqmulti: frequency distribution
        
    """
    raw_matrix_data = data_frame
    
    if iEEGPCA:
        pca = PCA(n_components=0.95)
        raw_matrix_data_sub = pca.fit_transform(np.transpose(raw_matrix_data))
        raw_matrix_data = np.transpose(raw_matrix_data_sub)
        del raw_matrix_data_sub,pca

    windowed_100_data = sliding_window(raw_matrix_data,size= int(rms_interval * sfreq),stepsize= int(step_size * sfreq))
    print("We obtained a windowed data of size",windowed_100_data.shape)
    
    psdmulti,freqmulti = psd_array_multitaper(windowed_100_data[:,0,:], sfreq, bandwidth = bandwidth, fmin = freq_bands[0], fmax = freq_bands[1],
                                                 normalization = "full", verbose = 0, n_jobs=4);
    print("First run okay, proceding to more loops")
    bpvals_low = np.zeros(shape = (windowed_100_data.shape[0],windowed_100_data.shape[1],len(freqmulti)))
    bpvals_low[:,0,:] = psdmulti
    
    for i in range(1,windowed_100_data.shape[1]):

        psdmulti,freqmulti = psd_array_multitaper(windowed_100_data[:,i,:], sfreq, bandwidth = bandwidth, fmin = freq_bands[0], fmax = freq_bands[1],
                                                 normalization = "full", verbose = 0, n_jobs=4);
         
        bpvals_low[:,i,:] = psdmulti

    return(bpvals_low,freqmulti)

In [4]:
import numpy as np
signal_0 = np.random.rand(11,18888) # Generate random signal
sfreq = 128 # sampling frequency


In [5]:
a11,a12 = multitaper_psd(signal_0, 128, rms_interval = 1, step_size = 0.5, bandwidth = 4, freq_bands = [1,16], iEEGPCA = False)

We obtained a windowed data of size (11, 294, 128)
First run okay, proceding to more loops


In [6]:
print(a11.shape)
print(a12)

(11, 294, 16)
[ 1.  2.  3.  4.  5.  6.  7.  8.  9. 10. 11. 12. 13. 14. 15. 16.]
