In [1]:
import numpy as np
from scipy import io, signal, fft, interpolate
from scipy.signal import windows, detrend
from matplotlib import pyplot as plt

def dynamic_spectral(x, fs=1, nperseg=256, nfft=None, noverlap=None, nw=3, ntapers=None, detrend_method='constant'):
    """
    Compute the pair-wise cross-spectral density for all channels in an array using Slepian tapers. Adapted from
    the mtcsd function in the labbox Matlab toolbox (authors: Partha Mitra, Ken Harris).
    
    Parameters
    ----------
    x : ndarray
        2D array of signals across which to compute CSD, columns should correspond to channels
    fs : float (default = 1)
        sampling frequency
    nperseg : int, None (default = None)
        number of data points per segment, if None nperseg is set to 256
    nfft : int, None (default = None)
        number of points to include in scipy.fft.fft, if None nfft is set to 2 * nperseg, if nfft > nperseg data 
        will be zero-padded
    noverlap : int, None (default = None)
        amout of overlap between consecutive segments, if None noverlap is set to nperseg / 2
    nw : int (default = 3)
        time-frequency bandwidth for Slepian tapers, passed on to scipy.signal.windows.dpss
    ntapers : int, None (default = None)
        number of tapers, passed on to scipy.signal.windows.dpss, if None ntapers is set to nw * 2 - 1 (as 
        suggested by original authors)
    detrend_method : {'constant', 'linear'} (default = 'constant')
        method used by scipy.signal.detrend to detrend each segment
        
    Returns
    -------
    f : ndarray
        frequency bins
    csd : ndarray
        full cross-spectral density matrix
    """
        
    # set some default for parameters values     
    if nfft is None:
        nfft = nperseg * 2
    
    if noverlap is None:
        noverlap = nperseg / 2
        
    if ntapers is None:
        ntapers = 2 * nw - 1
        
    stepsize = nperseg - noverlap
    nsegs = int(np.floor(len(x) / stepsize))

    fftnorm = np.sqrt(2) # value taken from original matlab function
    csdnorm = ntapers
    
    # initialize csd matrix
    csd = np.zeros((nsegs, nfft), dtype='complex128')
    
    # get FFT frequency bins
    f = fft.fftfreq(nfft, 1/fs)
    
    # get tapers
    tapers = windows.dpss(nperseg, nw, Kmax=ntapers)

    # loop over segments
    for seg_ind in range(nsegs):
        
        # prepare segment
        i0 = int(seg_ind * stepsize)
        i1 = int(seg_ind * stepsize + nperseg)
        if i1 > len(x): # stop if segment is out of range of data
            break
        seg = x[i0:i1]
        seg = detrend(seg, type=detrend_method, axis=0)
    
        # apply tapers
        tapered_seg = np.full((len(tapers), seg.shape[0]), np.nan)
        for taper_ind, taper in enumerate(tapers):
            tapered_seg[taper_ind] = (seg.T * taper).T    
        
        # compute FFT for each channel-taper combination
        pxx = fft.fft(tapered_seg, n=nfft) / fftnorm
        
        csd[seg_ind] = (pxx[:,:] * np.conjugate(pxx[:, :])).sum(axis=0) / csdnorm
    
    return f, csd

In [5]:
# whitening
def autocovariance(X, p):
    '''
    calculate autocovariance for some data X
    p: time lag
    '''
    scale = len(X) - p
    autoCov = 0
    for i in np.arange(0, len(X)-p):
        autoCov += ((X[i-p]))*(X[i])
        
    return autoCov/scale
    

def stack_cov(X, p):
    '''
    return a autocovariance matrix based on the Yule-Walker equation
    from data X of order p
    '''
    
    Xt_out = []
    cov = [autocovariance(X, i) for i in range(p)]
    cov_reverse = [autocovariance(X, p-i-1) for i in range(p)]
    
    for i in range(p):
        left = cov_reverse[:i]
        right = cov[:p-i]
        Xt_out.extend(left)
        Xt_out.extend(right)
        
    return np.array(Xt_out).reshape(p, p)


def AR(p, X):
    '''
    estimate AR(p) coefficients a for data X by solving the Yule-
    Walker equation in matrix form
    
    p: order of AR model
    '''
    
    X_mat = stack_cov(X, p)
    
    Y = [autocovariance(X, i) for i in range(1, p+1)]
    inv_XX = np.linalg.inv(np.dot(X_mat.T, X_mat))
    XY = np.dot(X_mat, Y)
    a = np.dot(inv_XX, XY)
    
    return a


def predict(a, X):
    '''
    estimate signal with AR coefficients a for data X
    '''
    
    Xt_out = []
    
    for t in range(len(a), len(X)):
        
        temp = 0
        
        for k in range(len(a)):
            temp += a[k]*X[t-k]
        
        Xt_out.append(temp)
        
    return Xt_out

def sigma2(a, x0, x_prev):
    '''
    calculate sigma^2 from Yule-Walker equation
    x0: data at t = 0 from AR estimated signal
    x_prev: data at t = 0-p to t = 0-1 from original signal
    return the noise/innovation/error term at t = 0
    '''
    
    p = len(a)
    variance = 0
    for i, a_i in enumerate(a):
        variance += a_i * x_prev[len(x_prev)-i-1]
    
    sigma = x0 - variance
    
    return sigma

def whiten_signal(signal,ar_order):
    if ar_order == 0:
        return signal
    ar_coeffs = AR(ar_order, signal)
    ar_pred = predict(ar_coeffs, signal)
    return [sigma2(ar_coeffs, ar_pred[i], signal[i:i+ar_order]) for i in range(len(ar_pred))]

In [20]:
# load the data
# Data:  16 channels of hippocampal LFP time series from a file lfp_1shank.mat.
# Matrix lfps (1250Hz sampling rate samples x 16 channels) 

lfps = io.loadmat('ws_data_1shank.mat')['lfps'] # array of LFP data
fs = 1250 # sampling frequency

lfps = lfps[:fs*5,:]

# pick a channel 
white_lfps = np.array([whiten_signal(lfps[:,lfp],2) for lfp in range(lfps.shape[1])])

  autoCov += ((X[i-p]))*(X[i])


In [35]:
window_size = 128
f = None
sps = []
for white_lfp in white_lfps: 
    f,sp = dynamic_spectral(white_lfp,fs,window_size,nw=3)
    f = f[:int(len(f)/2)]
    sps.append(sp[:,np.where(f>30)[0][0]:np.where(f<120)[0][-1]])
sps = np.array(sps)
print(np.where(f>30)[0][0],np.where(f<120)[0][-1])

7 24


In [36]:
sps.shape

(16, 97, 17)