In [None]:
import math
import sys

import IPython
import IPython.display as ipd
import matplotlib.pylab as plt
import numpy as np
import pandas as pd

%reload_ext autoreload
%autoreload 2

%matplotlib inline
#%matplotlib notebook

from matplotlib import rcParams
rcParams["figure.max_open_warning"] = False

In [None]:
from wall_analysis import parse_calibration_experiments

fname = 'results/calibration.pkl'

try:
    df_total = pd.read_pickle(fname)
    print('read', fname)
except:
    print('could not read', fname)
    df_total = parse_calibration_experiments()
    pd.to_pickle(df_total, fname)
    print('saved as', fname)

In [None]:
df_total.tail()

In [None]:
# calculate psd for sweep without snr selection
from wall_analysis import filter_by_dicts, extract_linear_psd

dict_chosen = [{
    'snr': 0,
    'motors': 0,
    'source': 'sweep',
}]


# found manually
sweep_lines = {
    'sweep': dict(
        slope=4000 / 280,
        offset=400,
        delta=20,
    )
}

df_chosen = filter_by_dicts(df_total, dict_chosen)
for source, df in df_chosen.groupby('source'):
    for j, row in df.iterrows():
        spec = np.sum(np.abs(row.signals_f), axis=1) # sum over all mics

        fig, axs  = plt.subplots(1, 2)
        fig.set_size_inches(10, 5)
        fig.suptitle(f'experiment "{row.appendix}"')
        axs[0].pcolorfast(range(spec.shape[0]), row.frequencies, np.log10(spec.T))
        psd = extract_linear_psd(row.signals_f, row.frequencies, ax=axs[0], **sweep_lines[source])

        for i in range(psd.shape[0]):
            axs[1].semilogy(row.frequencies, psd[i], label=f'mic{i}')
        plt.legend(loc='upper left')

In [None]:
def extract_snr_psd(signals_f, frequencies_matrix, min_t=0, max_t=None, n_freq=1, ax=None):
    n_mics = signals_f.shape[1]
    all_frequencies = np.unique(frequencies_matrix.flatten())

    psd_dict = [{f:[] for f in all_frequencies} for i in range(n_mics)]


    if max_t is None:
        max_t = frequencies_matrix.shape[0]
        
    if ax is not None:
        ax.plot(frequencies_matrix[:, 0])
        ax.axvline(min_t, color='k')
        ax.axvline(max_t, color='k')

    for i_t in range(min_t, max_t): # n_times x n_freqs 
        # save strongest n_freq frequencies
        fs = frequencies_matrix[i_t, :n_freq]
        for i_mic in range(n_mics):
            for f_idx, f in enumerate(fs):
                psd_dict[i_mic][f].append(np.abs(signals_f[i_t, i_mic, f_idx]))
    return psd_dict

In [None]:
# calculate psd for sweep without snr selection
from wall_analysis import filter_by_dicts

dict_chosen = [{
    'snr': 1,
    'motors': 0,
}]

#from crazyflie_description_py.parameters import N_BUFFER, FS
#all_frequencies = list(np.round(np.fft.rfftfreq(N_BUFFER, 1/FS)))
#print(all_frequencies)

kwargs = {
    'sweep': {
        'min_t': 100,
        'max_t': 320
    },
    'sweep_buzzer': {
        'min_t': 430,
        'max_t': 1230
    }
}

# need this to assign list to this columns
df_total = df_total.assign(psd_dict=None)

df_chosen = filter_by_dicts(df_total, dict_chosen)
for source, df in df_chosen.groupby('source'):
    for j, row in df.iterrows():
        fig, ax  = plt.subplots()
        fig.set_size_inches(10, 5)
        psd_dict = extract_snr_psd(row.signals_f, row.frequencies_matrix, ax=ax, **kwargs[source])
        ax.grid(which='both')
        df_total.loc[j, 'psd_dict'] = psd_dict
df_total.tail()

In [None]:
import seaborn as sns
mic_idx = 0

df_chosen = filter_by_dicts(df_total, dict_chosen)
for source, df in df_chosen.groupby('source'):
    fig, axs = plt.subplots(len(df), sharey=True, sharex=True)
    fig.set_size_inches(10, 10)
    fig.suptitle(f'{source}"{row.appendix}"')
    
    for j_idx, (j, row) in enumerate(df.iterrows()):
        axs[j_idx].set_title(f'experiment "{row.appendix}"')
        
        if row.psd_dict is None:
            continue
            
        psd_here = row.psd_dict[mic_idx] # {f:[]
        
        for key, vals in psd_here.items():
            
            #sns.scatterplot(x=[f]*len(vals), y=vals)
            # color by timestamp? 
            if len(vals) > 0:
                vals = vals[2:-2]
            axs[j_idx].scatter([key]*len(vals), vals, c=range(len(vals)), cmap='inferno')
        axs[j_idx].set_yscale('log')

In [None]:

for source, df in df_chosen.groupby('source'):
    for j, row in df.iterrows():
        
        spec = np.sum(np.abs(row.signals_f), axis=1) # sum over all mics
        psd_dict = row.psd_dict

        figs, axs  = plt.subplots(n_mics, sharex=True, sharey=True)
        figs.set_size_inches(10, 10)
        fig, ax  = plt.subplots()
        fig.set_size_inches(10, 5)
        figs.suptitle(f'{source}, experiment "{row.appendix}"')
        for i in range(n_mics):
            f_list = []
            vals_list = []
            for f, vals in psd_dict[i].items():
                if len(vals) == 0:
                    continue
                else:
                    f_list.append(f)
                    vals_list.append(np.mean(vals))
                    axs[i].scatter(f, np.mean(vals), color=f'C{i}')
            axs[i].set_yscale('log')
            ax.plot(f_list, vals_list, color=f'C{i}', label=f'mic{i}')
        ax.set_yscale('log')
        ax.legend(loc='upper left')

## Notes for next steps

- investigate variance of measurements within each bin, and potentially discard the first and last measurements
- decide on dataset to use for calibration based on above, potentially take the combination of multiple measurements
- understand what exact frequencies buzzer is playing at, and if there is the "step" effect (as discussed with Adrien)  