In [1]:
def dfa(x, scale_lim=[5,9], scale_dens=0.32, show=False):
    """
    Detrended Fluctuation Analysis - measures power law scaling coefficient
    of the given signal *x*.
    More details about the algorithm you can find e.g. here:
    Hardstone, R. et al. Detrended fluctuation analysis: A scale-free 
    view on neuronal oscillations, (2012).
    Args:
    -----
      *x* : numpy.array
        one dimensional data vector
      *scale_lim* = [5,9] : list of length 2 
        boundaries of the scale, where scale means windows among which RMS
        is calculated. Numbers from list are exponents of 2 to the power
        of X, eg. [5,9] is in fact [2**5, 2**9].
        You can think of it that if your signal is sampled with F_s = 128 Hz,
        then the lowest considered scale would be 2**5/128 = 32/128 = 0.25,
        so 250 ms.
        mine: fs=1000, 2**5/1000=32/1000=0.032
      *scale_dens* = 0.25 : float
        density of scale divisions, eg. for 0.25 we get 2**[5, 5.25, 5.5, ... ] 
      *show* = False
        if True it shows matplotlib log-log plot.
    Returns:
    --------
      *scales* : numpy.array
        vector of scales (x axis)
      *fluct* : numpy.array
        fluctuation function values (y axis)
      *alpha* : float
        estimation of DFA exponent
    """
    # cumulative sum of data with substracted offset
    y = np.cumsum(x - np.mean(x))
    scales = (2**np.arange(scale_lim[0], scale_lim[1], scale_dens)).astype(np.int)
    fluct = np.zeros(len(scales))
    # computing RMS for each window
    for e, sc in enumerate(scales):
        fluct[e] = np.sqrt(np.mean(calcRms(y, sc)**2))
    # fitting a line to rms data
    coeff = np.polyfit(np.log2(scales), np.log2(fluct), 1)
    if show:
        fluctfit = 2**np.polyval(coeff,np.log2(scales))
        plt.loglog(scales, fluct, 'bo')
        plt.loglog(scales, fluctfit, 'r', label=r'$\alpha$ = %0.2f'%coeff[0])
        plt.title('DFA')
        plt.xlabel(r'$\log_{10}$(time window)')
        plt.ylabel(r'$\log_{10}$<F(t)>')
        plt.legend()
        plt.show()
    return scales, fluct, coeff[0]

In [2]:
def calcRms(x, scale):
    """
    windowed Root Mean Square (RMS) with linear detrending.
    
    Args:
    -----
      *x* : numpy.array
        one dimensional data vector
      *scale* : int
        length of the window in which RMS will be calculaed
    Returns:
    --------
      *rms* : numpy.array
        RMS data in each window with length len(x)//scale
    """
    # making an array with data divided in windows
    shape = (x.shape[0]//scale, scale)
    X = np.lib.stride_tricks.as_strided(x,shape=shape)
    # vector of x-axis points to regression
    scale_ax = np.arange(scale)
    rms = np.zeros(X.shape[0])
    for e, xcut in enumerate(X):
        coeff = np.polyfit(scale_ax, xcut, 1)
        xfit = np.polyval(coeff, scale_ax)
        # detrending and computing RMS of each window
        rms[e] = np.sqrt(np.mean((xcut-xfit)**2))
    return rms

In [3]:
import numpy as np
from scipy.signal import hilbert,butter, lfilter,welch
def butterBandpass(data, lower_limit_filter, upper_limit_filter, sampling_rate, order=4):
    """
    This func is for filtering signal between lower and upper bounds
    the methods are used from scipy.signal lib
    """
    nyquist_coeff = 0.5 * sampling_rate
    low_frequences_filter = lower_limit_filter / nyquist_coeff
    high_frequences_filter = upper_limit_filter / nyquist_coeff
    numerator_filter, denominator_filter = butter(order, 
                                                  [low_frequences_filter, high_frequences_filter],
                                                  btype='band')
    
    # based on numinator and denominator the filter signal ...                                            )
    filtered_signal = lfilter(numerator_filter, denominator_filter, data)
    return filtered_signal

In [4]:
#import mne
import pandas as pd
import numpy as np
import scipy.io
import scipy.signal as ss
#import pyedflib
from scipy.signal import hilbert,butter, lfilter,welch
#from scipy.integrate import simps
#from meegkit.asr import ASR
#from meegkit.utils.matrix import sliding_window
#from mne.time_frequency import psd_array_multitaper
#from neurodsp.utils.download import load_ndsp_data
sampling_rate = 250

In [5]:
num_subjects=np.array([97,98])

for ii in range(len(num_subjects)):
    data= np.load('s'+str(num_subjects[ii])+'_clean_resting_state_2nd.npy') 
    dfas_delta=np.empty(63)
    dfas_theta=np.empty(63)
    dfas_alpha=np.empty(63)
    dfas_beta=np.empty(63)
    dfas_h_beta=np.empty(63)
    dfas_beta1=np.empty(63)
    dfas_beta2=np.empty(63)
    dfas_beta3=np.empty(63)
    dfas_gamma1=np.empty(63)
    dfas_gamma2=np.empty(63)
    
    filter_signal_delta = butterBandpass(data, 1, 4, 250)
    filter_signal_theta = butterBandpass(data, 4, 8, 250)
    filter_signal_alpha = butterBandpass(data, 8, 12, 250)
    filter_signal_beta = butterBandpass(data, 12, 25, 250)
    filter_signal_h_beta = butterBandpass(data, 25, 30, 250)
    filter_signal_beta1 = butterBandpass(data, 12, 15, 250)
    filter_signal_beta2 = butterBandpass(data, 15, 18, 250)
    filter_signal_beta3 = butterBandpass(data, 18, 25, 250)
    filter_signal_gamma1 = butterBandpass(data, 30, 40, 250)
    filter_signal_gamma2 = butterBandpass(data, 40, 49, 250)
    
    for i in range (63):
            x_delta = np.abs(hilbert(filter_signal_delta[i]))
            scales, fluct, alpha_factor_delta = dfa(x_delta, show=0)
            dfas_delta[i]=alpha_factor_delta
        
            x_theta = np.abs(hilbert(filter_signal_theta[i]))
            scales, fluct, alpha_factor_theta= dfa(x_theta, show=0)
            dfas_theta[i]=alpha_factor_theta
        
            x_alpha= np.abs(hilbert(filter_signal_alpha[i]))
            scales, fluct, alpha_factor_alpha = dfa(x_alpha, show=0)
            dfas_alpha[i]=alpha_factor_alpha
        
            x_beta = np.abs(hilbert(filter_signal_beta[i]))
            scales, fluct, alpha_factor_beta = dfa(x_beta, show=0)
            dfas_beta[i]=alpha_factor_beta
        
            x_h_beta = np.abs(ss.hilbert(filter_signal_h_beta[i]))
            scales, fluct, alpha_factor_h_beta = dfa(x_h_beta, show=0)
            dfas_h_beta[i]=alpha_factor_h_beta
        
            x_beta1 = np.abs(hilbert(filter_signal_beta1[i]))
            scales, fluct, alpha_factor_beta1 = dfa(x_beta1, show=0)
            dfas_beta1[i]=alpha_factor_beta1
        
            x_beta2 = np.abs(hilbert(filter_signal_beta2[i]))
            scales, fluct, alpha_factor_beta2 = dfa(x_beta2, show=0)
            dfas_beta2[i]=alpha_factor_beta2
        
            x_beta3 = np.abs(hilbert(filter_signal_beta3[i]))
            scales, fluct, alpha_factor_beta3 = dfa(x_beta3, show=0)
            dfas_beta3[i]=alpha_factor_beta3
        
            x_gamma1 = np.abs(hilbert(filter_signal_gamma1[i]))
            scales, fluct, alpha_factor_gamma1 = dfa(x_gamma1, show=0)
            dfas_gamma1[i]=alpha_factor_gamma1
        
            x_gamma2 = np.abs(hilbert(filter_signal_gamma2[i]))
            scales, fluct, alpha_factor_gamma2 = dfa(x_gamma2, show=0)
            dfas_gamma2[i]=alpha_factor_gamma2
        
            dfa_all_band=[dfas_delta,dfas_theta,dfas_alpha,dfas_beta,dfas_h_beta,
                          dfas_beta1,dfas_beta2,dfas_beta3,dfas_gamma1,dfas_gamma2]                                                         
    np.save('s'+str(num_subjects[ii])+'_DFA_all_band_2nd', dfa_all_band)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  scales = (2**np.arange(scale_lim[0], scale_lim[1], scale_dens)).astype(np.int)


KeyboardInterrupt: 