In [None]:
import numpy as np
import pandas as pd

from scipy.signal import iirfilter, lfilter, periodogram, stft, istft
from scipy.spatial.distance import seuclidean
from scipy.stats import chi2

import matplotlib.pyplot as plt
%matplotlib notebook
figsize = [9.5, 5]

In [None]:
file = 'data/TRF01_IW976-0032_1634216407.fthr'

In [None]:
def getParams(fs, alphaMax=2000, dfreq=1500, fMin=5e+4):
    # produces the parameters for the cyclic spectrum estimation
    
    # fs - samping rate [sample/sec]
    # alphaMax - max modulation frequency [Hz]
    # dfreq - carrier frequency resolution [Hz]
    # fMin - min carrier frequency [Hz]

    # STFT windows' hop [sample]
    R = int(np.floor(fs / (2 * alphaMax))) 
    # STFT window length [sample]
    Nw = int(fs / dfreq)

    # Hanning window
    w = np.hanning(Nw)
    # Dirichlet kernel parameter (oreder)
    P = int(np.round((Nw - 1) / (2 * R)))
    # Dirichlet kernel
    D = np.sum(
        [np.exp(2 * np.pi * 1j * p *(np.arange(Nw) - Nw / 2) / Nw) for p in np.arange(- P, P + 1)], 
        axis=0
    )
    D = D.real
    return fMin, R, Nw, w, D

In [None]:
def getCS(x, fs, approxCoh=True, normalization=False, alphaMax=2000, dfreq=1500): 
    # An implementation of the cyclic spectral correlation and cyclic spectral coherence according to 
    # Borghesani, P., and J. Antoni. "A faster algorithm for the calculation of the fast spectral correlation." 
    # Mechanical Systems and Signal Processing 111 (2018): 113-118.
    
    #inputs:
    # x - signal
    # fs - sampling rate [sample/sec]
    # approxCoh - neglect the deviation of the carrier freqeuncy due to the computation 
    # method for the estimation of the coherence Sxx(f-2 * alpha) = Sxx(f) 
    # normalization - calculation of the normalization factor
    # alphaMax - maximal modulation frequency [Hz]
    # dfreq - minimal carrier frequency [Hz]
    
    # comment: 
    # another approximation is applied always: S(f, alpha)=S(f-2*alpha, alpha)
    # this part should not affect the detection and it is mainly cosmetic
    
    fMin, R, Nw, w, D = getParams(fs, alphaMax=alphaMax, dfreq=dfreq)

    # STFT with with Hanning window 
    X_w = stft(x, fs=fs, window=w, nperseg=Nw, noverlap=Nw - R, nfft=Nw, return_onesided=True)[-1]
    # STFT with with Hanning multiplied by Dirichlet kernel window
    f, t, X_w_d = stft(x, fs=fs, window=w * D, nperseg=Nw, noverlap=Nw - R, nfft=Nw, return_onesided=True)

    if approxCoh:
        # here I save some computation time by removing the frequencies below fMin.
        X_w = X_w[f >= fMin, :-1]
        X_w_d = X_w_d[f >= fMin, :-1]

    # Cyclcic Spectrum
    CS = np.fft.fft(np.conjugate(X_w) * X_w_d, axis=1).T
    # Modulation frequency vector
    alpha = np.fft.fftfreq(X_w_d.shape[1], R / fs)
    # remove the negative modulation frequency part
    pistiveAlphaCond = alpha >= 0
    CS = CS[pistiveAlphaCond, :]
    alpha = alpha[pistiveAlphaCond]

    if normalization:
        # here I implemented the normalization but did not find it useful - the results' improvement is not impressive.
        normalizingFactor = np.fft.fft((w**2) * D, int(R * (1 + (x.size - Nw) / R)))[:np.sum(pistiveAlphaCond)]
        normalizingFactor *= fs * X_w_d.shape[1]
        CS = (CS.T / normalizingFactor).T
        normalizingFactor_abs = np.abs(normalizingFactor)
        normalizingFactorCond = normalizingFactor_abs / np.max(normalizingFactor_abs) > 0.95
        CS = CS[normalizingFactorCond, :]
    else:
        normalizingFactorCond = np.ones(np.sum(pistiveAlphaCond), dtype=bool)


    # Cyclic Coherence
    CS_abs = np.abs(CS)
    if approxCoh:
        # second approximation - neglect the impact of the modulation frquency on the spectrum
        CCoh = CS_abs / CS_abs[0, :]
    else:
        inds = np.atleast_2d(np.arange(f.size)) - np.atleast_2d((np.arange(CS.shape[0]) * Nw) / (R * alpha.size)).T
        inds = inds.astype(int)
        CCoh = CS_abs / np.sqrt(CS_abs[0, :] * CS_abs[0, inds])
        CS = CS[:, f >= fMin]
        CCoh = CCoh[:, f >= fMin]

    alpha = alpha[normalizingFactorCond]
    f = f[f >= fMin]

    return CS, CCoh, f, alpha

In [None]:
def getPvalEES(x, fs, alphaMax=2000, dfreq=1500, alphaLims=[0.01, 0.51]):
    #wrapper for getCS + extracter of the p-value between the 100Hz harmonics to the background
    
    #inputs:
    # x - signal
    # fs - sampling rate
    # alphaMax - max modulation frequency [Hz]
    # dfreq - carrier frequency resolution [Hz]
    # alphaLims - relative limits (0:1) of the feature extractor with respect to the modulation frequency 

    CS, CCoh, f, alpha = getCS(x, fs, alphaMax=alphaMax, dfreq=dfreq, normalization=False)
    CCoh_abs_sqrt = np.abs(CCoh.T)**2

    # cutting out with respect to the limits of the modulation frequency
    edgeCond = (alpha > alphaMax * alphaLims[0]) & (alpha < alphaMax * alphaLims[1])
    CCoh_abs_sqrt = CCoh_abs_sqrt[:, edgeCond]
    alpha = alpha[edgeCond]

    #Enhenced Envelope Spectrum
    EES = CCoh_abs_sqrt.sum(axis=0) 

    # condition for harmonics of 100Hz
    cond100 = np.mod(alpha, 100) == 0
    cond360 = alpha > 360
    EES_PD = EES[cond100 & cond360]
    EESnot100 = EES[~cond100]

    # standardized Euclidean distance between the distribution of the background spectrum to the 100Hz harmonics
    SED = seuclidean(
        EES_PD, 
        EESnot100.mean() * np.ones_like(EES_PD), 
        EESnot100.var() * np.ones_like(EES_PD)
    )
    
    pValue = chi2(EES_PD.size).sf(SED)
    
    return pValue, CCoh_abs_sqrt, alpha, f, EES

In [None]:
df = pd.read_feather(file)

channNames = df.columns
channNames = channNames[channNames.str.contains('adc_signal_mv')]
    
for ch in channNames:
    print(file.split('/')[-1].split('.')[0] + ': ' + ch)
    x = df[ch].values
    fs = 1000 / df.time_ms[1]
    pValue, CCoh_abs_sqrt, alpha, f, EES = getPvalEES(x, fs)
    print("P-value={:.2e}".format(pValue))

    plt.figure(figsize=figsize)
    plt.pcolormesh(alpha, f, CCoh_abs_sqrt, shading='auto', vmax=np.percentile(CCoh_abs_sqrt, 99.8))
    plt.xlabel('Modulation frequency [Hz]')
    plt.ylabel('Carrier Frequency [Hz]')
#     plt.title(file.split('/')[-1][:-5] + ': Spectal cyclic coherence; channel ' + str(ch))
    plt.xticks(np.arange(400, alpha[-1], 50))
    plt.colorbar()
    plt.show()

    plt.figure(figsize=figsize)
    plt.plot(alpha, EES)
    plt.xlabel('Modulation frequency [Hz]')
#     plt.title(file.split('/')[-1][:-5]  + ': channel ' + str(ch) + '; p-value={:.3f}'.format(pValue))
#     plt.xticks(np.arange(400, alpha[-1], 50))
    plt.ylabel('EES')
    plt.grid()
    plt.show()