In [None]:
!pip install -U dask[complete] fastavro librosa noisereduce

In [None]:
import dask.bag as db
import numpy as np
from scipy.signal import stft
import pandas as pd

from dask.distributed import Client
import sys

In [None]:
def calc(avroDict):
    def getCS(x, fs, approxCoh=True, normalization=False):
        # An implementation of the spectral correlation and spectral coherence according to 
        # Borghesani, P., and J. Antoni. "A faster algorithm for the calculation of the fast spectral correlation." 
        # Mechanical Systems and Signal Processing 111 (2018): 113-118.

        def getParams(fs, alphaMax=1000, df=100, fMin=5e+4):
            # alphaMax - max modulation frequency
            # df - carrier frequency resolution
            # fMin - min carrier frequency

            # STFT windows' hop
            R = int(np.floor(fs / (2 * alphaMax))) #shift of the stft
            # STFT window length
            Nw = int(fs / df)
            # number of STFT windows
            # M = int((x.size - Nw) / R + 1)

            # hannind window
            w = np.hanning(Nw)
            # Dirichlet kernel parameter
            P = int(np.round((Nw - 1) / (2 * R)))
            # Dirichlet kernel
            D = np.sum(
                [np.exp(2 * np.pi * 1j * p *(np.arange(Nw) - Nw / 2) / Nw) for p in np.arange(- P, P + 1)], 
                axis=0
            )
            D = D.real
            return fMin, R, Nw, w, D

        fMin, R, Nw, w, D = getParams(fs)

        # STFT with with Hanning window and with Hanning multiplied by Dirichlet kernel
        X_w = stft(x, fs=fs, window=w, nperseg=Nw, noverlap=Nw - R, nfft=Nw, return_onesided=True)[-1]
        f, t, X_w_d = stft(x, fs=fs, window=w * D, nperseg=Nw, noverlap=Nw - R, nfft=Nw, return_onesided=True)

        if approxCoh:
            # here I save some computation time by removing the frequencies below fMin.
            X_w = X_w[f >= fMin, :]
            X_w_d = X_w_d[f >= fMin, :]

        # Cyclcic Spectrum
        CS = np.fft.fft(np.conjugate(X_w) * X_w_d, axis=1).T
        # Modulation frequency
        alpha = np.fft.fftfreq(X_w_d.shape[1], R / fs)
        pistiveAlphaCond = alpha >= 0
        CS = CS[pistiveAlphaCond, :]

        if normalization:
            # here I implemented the normalization but did not find it useful - the results' improvement is not impressive.
            normalizingFactor = np.fft.fft((w**2) * D, int(R * (1 + (x.size - Nw) / R)))[:np.sum(pistiveAlphaCond)]
            normalizingFactor *= fs * X_w_d.shape[1]
            CS = (CS.T / normalizingFactor).T
            normalizingFactor_abs = np.abs(normalizingFactor)
            normalizingFactorCond = normalizingFactor_abs / np.max(normalizingFactor_abs) > 0.95
            CS = CS[normalizingFactorCond, :]
        else:
            normalizingFactorCond = np.ones(np.sum(pistiveAlphaCond), dtype=bool)
            

        # Cyclic Coherence
        CS_abs = np.abs(CS)
        if approxCoh:
            CCoh = CS_abs / CS_abs[0, :]
        else:
            inds = np.atleast_2d(np.arange(f.size)) - np.atleast_2d((np.arange(CS.shape[0]) * Nw) / (R * alpha.size)).T
            inds = inds.astype(int)
            CCoh = CS_abs / np.sqrt(CS_abs[0, :] * CS_abs[0, inds])
            CS = CS[:, f >= fMin]
            CCoh = CCoh[:, f >= fMin]

        alpha = alpha[pistiveAlphaCond][normalizingFactorCond]
        f = f[f >= fMin]

        return CS, CCoh, f, alpha
    
    fs = avroDict['samplerate']
    
    EES = {}
    for iCh in range(len(avroDict['adc_signal_mv'])):
        _, CCoh, _, alpha = getCS(avroDict['adc_signal_mv'][iCh], fs)
        
        if iCh == 0:
            EES['alpha'] = alpha
        
        for thresh in np.arange(0.01, 0.4, 0.01):
            CCohThresh = CCoh * (CCoh > thresh)
            label = str(iCh) + '_' + str(thresh)
            EES[label] = CCohThresh.sum(axis=1)
    
    #print(avroDict['id'] + '-' + str(avroDict['timestamp']))
    #sys.stdout.flush()
        
    return {'res': EES, 'label': avroDict['id'] + '-' + str(avroDict['timestamp'])}

In [None]:
files = !gsutil ls gs://transformers-ae-us-central1/*.avro
files = [f for f in files if (('test' not in f) and ('lab' not in f))]
len(files)

In [None]:
chunkLen = 200
fileChunks = [files[i:i + chunkLen] for i in range(0, len(files), chunkLen)]
print(len(fileChunks))
print(len(fileChunks[0]))
print(len(fileChunks[-1]))

In [None]:
with Client() as client:
    for iChunck in range(len(fileChunks)):
        chank = fileChunks[iChunck]
        print(iChunck, end=', ')
        d = db.read_avro(chank, blocksize=None)
        df = d.map(calc).to_dataframe().compute()
        df.to_parquet('gs://ctoo/gideon/res/EESvsThresh-{}.prqt'.format(iChunck))