## Import and Functions

In [1]:
import ipywidgets
# import pyvisa
# from pyvisa import ResourceManager
from scipy.fft import fft, fftfreq, ifft

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import scipy

import plotly
import plotly.graph_objects as go

import nidaqmx
from nidaqmx import system
# https://docs.bokeh.org/en/latest/docs/examples/interaction/tools/range_tool.html
import json
import cmath
from ipywidgets import interact,fixed,widgets

from ipywidgets import Button, HBox, VBox,Label,Layout,GridspecLayout
from IPython.display import display,update_display,clear_output
import functools
from scipy.signal import spectrogram
from scipy.signal import windows

import cmath

In [2]:
# Affichage des systèmes détectés
system = system.System.local()
print(system.driver_version)
for device in system.devices:
    print(device)

DriverVersion(major_version=23, minor_version=5, update_version=0)
Device(name=Dev1)


In [84]:
#attribut of param
class Parameters:
    def __init__(self) -> None:
        # Channels settings
        self.channel_in0 = "Dev1/ai0"
        self.channel_in1 = "Dev1/ai1"

        self.channel_out = "Dev1/ao0"

        self.small_fmin = 500
        self.small_fmax = 6400

        self.large_fmin = 50
        self.large_fmax = 1600
        
        self.min_volt = -0.5
        self.max_volt = 0.5

        self.sample_rate = 15000 

        self.calibration_cst = {'0':1,'1':1}
        self.H_ratio = {'large':1,'small':1,'combined':1} #combined 

        self.df = 1

    



In [85]:
def frequencies_limits(signal,av=True):
    if av:
        frequencies = signal.av_f
    else:
        frequencies = signal.f
            
    if signal.tube == 'large':
        frequencies_bool = (frequencies>=signal.param.large_fmin) & (frequencies <= signal.param.large_fmax)
    
    elif signal.tube == 'small':
        frequencies_bool = (frequencies>=signal.param.small_fmin) & (frequencies <= signal.param.small_fmax)

    else:
        frequencies_bool = (frequencies>=signal.param.large_fmin) & (frequencies <= signal.param.small_fmax)

    return frequencies_bool

In [86]:
def rms(x):
    return np.sqrt(np.mean(np.array(x)**2))

def SNR(x_noise,x_signal):
    SNR = 20*np.log(rms(x_signal)/rms(x_noise))
    return SNR


def average_smoothing(amplitudes,N=10):
    return np.hstack( [np.array(amplitudes[:int(N/2)]),np.array([np.mean(amplitudes[i-int(N/2):(i+1)+int(N/2)]) for i in range(int(N/2),len(amplitudes)-int(N/2))]),np.array( amplitudes[-int(N/2):])])

### Plotting Functions

In [87]:
# Computation
## Reflection coefficient

   
def plot_fft_m(list_signals,dB=False,fig=None,smooth=None):

    if fig == None:
        fig = go.Figure()
        fig_bool=None
    else:
        fig_bool=1

    for signal in list_signals:

        for j in [0,1]:
            # Fourier Transform
            amplitudes = abs(fft(signal.acquired_samples[j]))/len(signal.acquired_samples[j])  
            if dB:
                amplitudes = 20*np.log10(amplitudes/(2*10**-5))
            
            frequencies = fftfreq(len(signal.acquired_samples[j]), 1/signal.sample_rate)  
            if signal.tube == 'large':
                frequencies_bool = (frequencies>=signal.param.large_fmin) & (frequencies <= signal.param.large_fmax)
            elif signal.tube == 'small':
                frequencies_bool = (frequencies>=signal.param.small_fmin) & (frequencies <= signal.param.small_fmax)

            else:
                frequencies_bool = (frequencies>=signal.param.large_fmin) & (frequencies <= signal.param.small_fmax)
            if smooth:
                N = 10
                amplitudes = np.hstack( [np.array(amplitudes[:int(N/2)]),np.array([np.mean(amplitudes[i-int(N/2):(i+1)+int(N/2)]) for i in range(int(N/2),len(amplitudes)-int(N/2))]),np.array( amplitudes[-int(N/2):])])
            
            amplitudes[~frequencies_bool] = np.NAN
            fig.add_trace( go.Scatter(x=frequencies[frequencies>=0],y=amplitudes[frequencies>=0],name=f"{signal.name} Input {j + 1}")  )

        # Modify the title and axis name 
        if dB:
            fig.update_layout(
                title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)',
                legend_title='<b> Legends </b>'               
            )
        else:
            fig.update_layout(
                title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Amplitude (Pa)',
                legend_title='<b> Legends </b>'               
            )

        # Add a range slider
        fig.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )

    
    if fig_bool == None:

        fig.show()

        


### Genration and Acquisition functions

In [88]:

class TaskAcquisitionGeneration:

    def __init__(self,parameters:Parameters, measurement_caracteristics={'name':None,'tube':None,'material':None,'resonator':None}) -> None:
        
        # Channels settings
        self.channel_in0 = parameters.channel_in0
        self.channel_in1 = parameters.channel_in1
        self.channel_out = parameters.channel_out
        
        self.min_volt = parameters.min_volt
        self.max_volt = parameters.max_volt

        self.param = parameters

        self.signal = []
        self.acquired_samples = []
        self.seg = []

        self.stop_generation = False
        self.stop_acquisition = False

        self.name = measurement_caracteristics['name']
        self.tube = measurement_caracteristics['tube']
        self.material = measurement_caracteristics['material']
        self.resonator = measurement_caracteristics['resonator']

        self.sample_rate = parameters.sample_rate
        self.calibration_cst = parameters.calibration_cst
        self.H_ratio = parameters.H_ratio['large'] if self.tube == 'large' else parameters.H_ratio['small']

        self.f = []
        self.av_f = []
        self.av_fft = [] 
        self.c0 = 340
        self.k = 0
        self.s = 0.05 if self.tube == 'large' else 0.02 if self.tube == 'small' else 0 
        self.l = 0.15 if self.tube == 'large' else 0.037 if self.tube == 'small' else 0

        self.coh = []
        self.H12 = []


       
    # Task initialization
    ## Acquisition and Generation

    def __acquisition(self):
        
        ### Initialize and create a task for the acquisition of the signals
        # parameters :  channels name, min/max voltage, 
        #               sample rate, number of samples

        """A task represent a measure or a generation to be achieved.
        It is a set of one or more channels and their properties that includes the related clock (timing/sample rate) and other parameters (eg. min/max voltage) 
        """
        self.task = nidaqmx.Task()
        ### Add the name of the input channels and the voltage parameters
        self.task.ai_channels.add_ai_voltage_chan("Dev1/ai0", min_val=self.min_volt, max_val=self.max_volt)
        self.task.ai_channels.add_ai_voltage_chan("Dev1/ai1", min_val=self.min_volt, max_val=self.max_volt)
        
        #### configure the clock: arguments: rate = sampling rate, sample_mode = type of signal (finite),samps_per_chan = total number of samples to acquire
        self.task.timing.cfg_samp_clk_timing(self.sample_rate,sample_mode=nidaqmx.constants.AcquisitionType.FINITE,samps_per_chan=self.number_of_samples)
        #### set the function 'callback_function' to be called when the given number of samples has been acquired into the buffer
        self.task.register_every_n_samples_acquired_into_buffer_event(self.number_of_samples, self.callback_function)

        # set the function 'callback_function_stop' to be called once the acquisition task is finished
        self.task.register_done_event(self.callback_stop_acquisition)
        print('Acquisition defined')


    def __generation(self):
         ## Generation
        ### Initialize and create a task for the generation of the signal
        self.task_g = nidaqmx.Task()
        ## Add the name of the input channels and the voltage parameters
        self.task_g.ao_channels.add_ao_voltage_chan(self.channel_out, name_to_assign_to_channel='output', min_val=self.min_volt, max_val=self.max_volt)
        #### configure the clock
        self.task_g.timing.cfg_samp_clk_timing(self.sample_rate,sample_mode=nidaqmx.constants.AcquisitionType.FINITE,samps_per_chan=len(self.signal))
        #### write into the buffer the signal that will be transmitted
        self.task_g.write(self.signal)
        self.task_g.register_done_event(self.callable_stop_generation)
        print('Generation defined')




    def simple_acquisition(self,duration):
        # Create and start an acquisition withou signal generation
        # Useful for calibration and SNR calculation
        self.duration = duration
        self.number_of_samples = int(self.sample_rate * self.duration)  
        self.stop = False
        self.acquired_samples = []

        self.__acquisition()  

        self.task.start() 
        print('Acquisition started')
        while not self.stop_acquisition:
            pass
        self.task.close()
        print('Task closed')
        self.convert_and_filter_acquired_signal()
        self.convert()

        




    ## Generation
    def generation_sine_wave(self,duration,amplitude,frequence):

        #  Acquisition and Generation parameters
        self.duration = duration 

        ## number of samples to be acquired
        self.number_of_samples = int(self.sample_rate * self.duration) 
        ## initialization of the array that will store the acquired samples
        self.acquired_samples = []

        # Generation of the signal
        ## parameters: duration of the signal, amplitude of the generated signal
        self.frequence = frequence
        self.amplitude = amplitude 

        # time discretization and signal creation
        ## 1 sec is added to the duration so that the generated signal has more samples than the total number of samples to be acquired
        self.t = np.arange(0,(self.duration+1),1/(self.sample_rate))
        self.signal = self.amplitude*np.sin(2 * np.pi * self.frequence * self.t)

        ## Generation
        ### Initialize and create a task for the generation of the signal
        self.__generation()
        self.__acquisition()

        print('Tasks started')
        self.start_acquisition()
        while not self.stop_generation:
            pass
        self.close_task()
        print('Tasks closed')
        self.convert_and_filter_acquired_signal()
        self.convert()


        



    def creation_white_noise_frequency_band(self, amp = 0.25):
        # time discretization and signal creation
        ## 1 sec is added to the duration so that the generated signal has more samples than the total number of samples to be acquired
        self.t = np.arange(0,(self.duration+1),1/(self.sample_rate))
        mean = 0
        std = 1
        self.signal = np.random.normal(mean, std, size=len(self.t))
        self.signal /= np.max(self.signal) # normalization
        self.signal *= amp 

        order = 4
        min_freq,max_freq = 40,7000
        sos = scipy.signal.butter(order,[min_freq,max_freq],'bandpass',output='sos',fs=self.sample_rate)
        self.signal = scipy.signal.sosfilt(sos, self.signal)

    def generation_white_noise(self,duration ):

        #  Acquisition and Generation parameters
        self.duration = duration 
        ## number of samples to be acquired
        self.number_of_samples = int(self.sample_rate * self.duration) 
        ## initialization of the array that will store the acquired samples
        self.acquired_samples = []

        # Generation of the signal
        self.creation_white_noise_frequency_band() #create self.t and self.signal

        self.__generation()
        self.__acquisition()

        print('Tasks started')
        self.start_acquisition()
        while not self.stop_generation:
            pass
        self.close_task()
        print('Tasks closed')
        self.convert_and_filter_acquired_signal()
        self.convert()

    def measurement(self):
        self.generation_white_noise(30)

    # Calback functions
    def callback_function(self,task_handle, every_n_samples_event_type, number_of_samples, callback_data):
        print('Sample read')
        read = self.task.read(int(self.number_of_samples)) # read return a list [ [ ] ]
        self.acquired_samples += read



    # Tasks management
    def start_acquisition(self):
        # start the tasks
        self.acquired_samples = []
        self.stop_generation = False
        self.stop_acquisition = False

        self.task_g.start()
        self.task.start()


    def callable_stop_generation(self,task_handle, status, callback_data):
        self.task_g.stop
        print('Generation stop stoped')
        self.stop_generation = True  

    def callback_stop_acquisition(self,task_handle, status, callback_data):
        #stop the task
        self.task.stop
        print('Acquisition stoped')
        self.stop_acquisition = True


    def close_task(self):
        # close the tasks : they will need to be created again to be started
        self.task.close()
        self.task_g.close()


    def convert_and_filter_acquired_signal(self):
        for i in [0,1]:
            # self.acquired_samples[i] = np.array(self.acquired_samples[i])/self.calibration_cst[i] # conversion from voltage (V) to pressure (Pa)
            self.acquired_samples[i] = self.acquired_samples[i] - np.mean(self.acquired_samples[i]) # remove the offset from the signal
        self.convert_bool = False

    def convert(self):
        for i in [0,1]:
            self.acquired_samples[i] = np.array(self.acquired_samples[i])/self.calibration_cst[str(i)]
        self.convert_bool = True




    # Graphical display of the data

    def plot_acquired_signal(self,fig=None):
        # create a figure

        if fig == None:
            fig = go.Figure()
            fig_bool=None
        else:
            fig_bool=1
            
        # Add the 2 signals
        fig.add_trace( go.Scatter(x=np.arange(len(self.acquired_samples[0]))/self.sample_rate,y=self.acquired_samples[0],name=f"{self.name} M1",))
        fig.add_trace( go.Scatter(x=np.arange(len(self.acquired_samples[0]))/self.sample_rate,y=self.acquired_samples[1],name=f"{self.name} M2",))

        # Modify the title and axis name 
        fig.update_layout(
            title_text="Acquired signals",xaxis_title="Time (sec)", yaxis_title='Pressure (Pa)'               
        )

        # Add a range slider
        fig.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )

        if fig_bool == None:
            fig.show()


    def plot_generated_signal(self,fig_gs=None):
        # create a figure
        if fig_gs == None:
            fig_gs = go.Figure()
            fig_bool=None
        else:
            fig_bool=1

        # Add the 2 signals
        try:
            fig_gs.add_trace( go.Scatter(x=self.t,y=self.signal,name=f"{self.name} Output",))
        except:
            fig_gs.add_trace( go.Scatter(x=self.t,y=self.signal,name="Output",))


        # Modify the title and axis name 
        fig_gs.update_layout(
            title_text="Generated signals",xaxis_title="Time (sec)", yaxis_title='Amplitude (V)'               
        )

        # Add a range slider
        fig_gs.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )
        if fig_bool == None:
            fig_gs.show()

    def plot_fft(self,dB=False,fig=None):
        if fig == None:
            fig = go.Figure()
            fig_bool=None
        else:
            fig_bool=1
            

        for i in [0,1]:
            # Fourier Transform
            amplitudes = abs(fft(self.acquired_samples[i]))/len(self.acquired_samples[i])  # Transformée de fourier
            frequencies = fftfreq(len(self.acquired_samples[i]), 1/self.sample_rate)

            if self.tube == 'large':
                frequencies_bool = (frequencies>=self.param.large_fmin) & (frequencies <= self.param.large_fmax)
            elif self.tube == 'small':
                frequencies_bool = (frequencies>=self.param.small_fmin) & (frequencies <= self.param.small_fmax)

            else:
                frequencies_bool = (frequencies>=self.param.large_fmin) & (frequencies <= self.param.large_fmax)
                  # Fréquences de la transformée de Fourier

            if dB:
                amplitudes = 20*np.log10(amplitudes/(2*10**-5))
     
            fig.add_trace( go.Scatter(x=frequencies[frequencies_bool],y=amplitudes[frequencies_bool],name=f"{self.name} M{i + 1}",))


        # Modify the title and axis name 
        if dB:
            fig.update_layout(
            title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)'               
        )
        else : 
            fig.update_layout(
                title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Amplitude (Pa)'               
        )

        # Add a range slider
        fig.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )

        if fig_bool == None:
            fig.show()


    def plot_av_fft(self,dB=False,fig=None):
        if fig == None:
            fig = go.Figure()
            fig_bool=None
        else:
            fig_bool=1
            

        for i in [0,1]:
            # Fourier Transform
            amplitudes = self.av_fft[i]  # Transformée de fourier
            frequencies = self.av_f

            if self.tube == 'large':
                frequencies_bool = (frequencies>=self.param.large_fmin) & (frequencies <= self.param.large_fmax)
            elif self.tube == 'small':
                frequencies_bool = (frequencies>=self.param.small_fmin) & (frequencies <= self.param.small_fmax)

            else:
                frequencies_bool = (frequencies>=self.param.large_fmin) & (frequencies <= self.param.large_fmax)
                  # Fréquences de la transformée de Fourier

            if dB:
                amplitudes = 20*np.log10(abs(amplitudes)/(2*10**-5))
     
            fig.add_trace( go.Scatter(x=frequencies[frequencies_bool],y=abs(amplitudes[frequencies_bool]),name=f"{self.name} M{i + 1}",))


        # Modify the title and axis name 
        if dB:
            fig.update_layout(
            title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)'               
        )
        else : 
            fig.update_layout(
                title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Amplitude (Pa)'               
        )

        # Add a range slider
        fig.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )

        if fig_bool == None:
            fig.show()

    def plot_fft_generated_signal(self,dB=False):
        fig_fft = go.Figure()
        # Fourier Transform
        amplitudes = fft(self.signal/len(self.signal))  # Transformée de fourier
        frequencies = fftfreq(len(self.signal), 1/self.sample_rate)  # Fréquences de la transformée de Fourier
        if dB:
            amplitudes = 20*np.log10(amplitudes)

        if self.tube == 'large':
            frequencies_bool = (frequencies>=self.param.large_fmin) & (frequencies <= self.param.large_fmax)
        elif self.tube == 'small':
            frequencies_bool = (frequencies>=self.param.small_fmin) & (frequencies <= self.param.small_fmax)

        else:
            frequencies_bool = (frequencies>=self.param.large_fmin) & (frequencies <= self.param.large_fmax)
        # Add the signal
        fig_fft.add_trace( go.Scatter(x=frequencies[frequencies_bool],y=abs(amplitudes[frequencies_bool]),name=f"{self.name} Output",))

        # Modify the title and axis name 
        if dB:
            fig_fft.update_layout(
            title_text="Generated signal Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)'               
        )
        else : 
            fig_fft.update_layout(
                title_text="Generated signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Amplitude (V)'               
        )

        # Add a range slider
        fig_fft.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )
        fig_fft.show()


    def save_to_cjson(self,name):
        H_ratio_conv = {k:v if isinstance(v,int) else list([list(np.abs(v)),list(np.imag(v))]) for k,v in list(self.param.__dict__['H_ratio'].items()) }
        parameter = {k:v for k,v in list(self.param.__dict__.items())} # in order not to affect PARAMETERS, a copy is made
        parameter['H_ratio'] = H_ratio_conv
        
        data = {'generated': list(self.signal),
                'input': [list(s) for s in self.acquired_samples],
                'sample_rate': self.sample_rate,
                'duration': self.duration,
                'param': parameter,
                'caracteristic' : {'name':self.name,'tube':self.tube,'material':self.material,'resonator':self.resonator}
                }
        with open(f'{name}.json', 'w') as f:
            json.dump(data, f)


    def open_from_json(self,name):
        f = open(f'{name}.json')
        data = json.load(f)
        self.signal = data['generated']
        self.acquired_samples = data['input']
        self.sample_rate = data['sample_rate']
        self.param.H_ratio = {k:v if isinstance(v,int) else np.array([complex(i,j) for i,j in list(zip(v[0],v[1]))]) for k,v in list(data['param']['H_ratio'].items())}
        self.H_ratio = self.param.H_ratio['large'] if self.tube == 'large' else self.param.H_ratio['small']

        # [ complex(i,j) for i,j in list(zip(data['param']['H_ratio']['large'][0],data['param']['H_ratio']['large'][1]))]
        self.param.calibration_cst = data['param']['calibration_cst']
        self.param.df = data['param']['df']
        self.name = data['caracteristic']['name']
        self.tube = data['caracteristic']['tube']
        self.material = data['caracteristic']['material']
        self.resonator = data['caracteristic']['resonator']
        self.t = np.arange(0,(data['duration']+1),1/(data['sample_rate']))
        f.close()
            

In [89]:
class All_Signal():
    def __init__(self) -> None:
        self.all_signal = {}
        self.names = []

    def add_signal(self,parameters,name,tube,material,resonator):
        
        if name not in self.names:
            new_signal = TaskAcquisitionGeneration(parameters,measurement_caracteristics={'name':name,'tube':tube,'material':material,'resonator':resonator})
            self.all_signal[new_signal.name] = new_signal
            self.names += [new_signal.name]
        else:
            print('The signal already exists')

    def del_signal(self,name):
        if name in self.names:
            del self.all_signal[name]
            self.names = list(self.all_signal.keys())
        else:
            print('The signal does not exist')

#### Metrics computation

In [90]:
def spectro_average(signal,df):
     
    fe = signal.sample_rate
    # df = 1/t = fe/N  =>N = fe/df
    N = int(fe/df)
    overlap = int(N//8)
    signal.av_fft = [[],[]]
    signal.seg = [[],[]]

    for i in  [0,1]:
        f_ov, t_ov,Sxx_ov = spectrogram(np.array(signal.acquired_samples[i]),fe,mode='complex',scaling='spectrum',window='hann',nperseg=N,noverlap=overlap)
        Sxx_ov = Sxx_ov*2
        signal.seg[i] = np.transpose(Sxx_ov)
        Sxx_ov_av = np.mean(Sxx_ov,axis=1)
        signal.av_fft[i] = Sxx_ov_av
    
    signal.av_f = f_ov


def combine_signal(signal_large:TaskAcquisitionGeneration,signal_small:TaskAcquisitionGeneration):
    """Combine the the averaged fft of 2 signals

    Args:
        signal_large (TaskAcquisitionGeneration): _description_
        signal_small (TaskAcquisitionGeneration): _description_

    Returns:
        _type_: _description_
    """
    spectro_average(signal_large,signal_large.param.df)
    spectro_average(signal_small,signal_small.param.df)
    
    df = signal_large.av_f[1]
    nb_point = int((1600-500)/df)
    
    # Gaussian window to smooth the transitions
    w = windows.get_window(('gaussian',nb_point*2/6),nb_point*2)
    weigth = w[int(len(w)/2):]

    # select the right range for each signal
    frequencies_small = frequencies_limits(signal_small)
    frequencies_large = frequencies_limits(signal_large)
    signal_large_crop = signal_large.av_fft[0]
    signal_small_crop = signal_small.av_fft[0]
    signal_large_crop[~frequencies_large] = np.NaN
    signal_small_crop[~frequencies_small] = np.NAN

    signal_merged = [[],[]]

    for i in [0,1]:
        merge = (signal_large.av_fft[i][int(50/df):int(1600/df)][-int(nb_point):]* weigth  + signal_small.av_fft[i][int(500/df):int(6401/df)][:int(nb_point)]* weigth[::-1] )/(weigth[::-1]+weigth)
        signal_merged[i] = np.hstack([np.ones((50))*np.NAN,signal_large.av_fft[i][int(50/df):int(1600/df)-int(nb_point/df)], merge, signal_small.av_fft[i][signal_small.av_f>=500][int(nb_point/df):]])


    return signal_merged,signal_large.av_f




def combine_signal_seg(signal_large,signal_small,H12=False):
    """Combine the segment of the spectrogram of 2 signals

    Args:
        signal_large (_type_): _description_
        signal_small (_type_): _description_
        H12 (bool, optional): _description_. Defaults to False.

    Returns:
        _type_: _description_
    """
    
    df = signal_large.av_f[1]
    nb_point = int((1600-500)/df)

    # full scale frequencies
    frequencies = signal_large.av_f
    # crop scale frequencies

    def combine_seg(signal_large,signal_small):
    
        # Gaussian window to smooth the transitions
        w = windows.get_window(('gaussian',nb_point*2/6),nb_point*2)
        weigth = w[int(len(w)/2):]

        # select the right range for each signal
        frequencies_large = (frequencies>=50) & (frequencies <= 1600)
        frequencies_small = (frequencies>=500) & (frequencies <= 6400)

        signal_merged = [[],[]]

        merge = (signal_large[int(50/df):int(1600/df)][-int(nb_point):]* weigth  + signal_small[int(500/df):int(6401/df)][:int(nb_point)]* weigth[::-1] )/(weigth[::-1]+weigth)
        signal_merged = np.hstack([np.ones(int(50/df))*np.NAN,signal_large[frequencies_large][:-int(nb_point)-1], merge, signal_small[frequencies>=500][int(nb_point):]])

        return signal_merged
    
    signal_merged = [[],[]]

    for i in [0,1]:
        seg_merged = []
        for j in range(np.shape(signal_small.seg[i])[0]):
            seg_large = signal_large.seg[i][j,:]
            seg_small = signal_small.seg[i][j,:]
            seg_merged += [combine_seg(seg_large,seg_small)]
        signal_merged[i] = np.array(seg_merged)

    if H12 :
        H12_merged = combine_seg(signal_large.H_ratio,signal_small.H_ratio)
        return H12_merged

    return np.array(signal_merged),frequencies






def compute_H12(signal:TaskAcquisitionGeneration,av=True):
    if signal.tube !='combined':
        spectro_average(signal,signal.param.df)

    fft_0 = signal.seg[0][:,:]
    fft_1 = signal.seg[1][:,:]

    signal.k = 2*np.pi*signal.av_f/signal.c0
    signal.H12 = np.mean(signal.H_ratio*(fft_1/fft_0), axis = 0)


def plot_computation_ratio_H_m(list_signals,dB=False,fig=None):

    if fig == None:
        fig = go.Figure()
        fig_bool = False

    else:
        fig_bool = True
    
    for signal in list_signals:
        compute_H12(signal)
        frequencies_bool = frequencies_limits(signal,av=True)
        signal.H12[~frequencies_bool] = np.NAN

        if dB:
            fig.add_trace(go.Scatter(x=signal.av_f,y=20*np.log10(abs(signal.H12)/(2*10**-5)),name=f"{signal.name} H_12"))
            fig.update_layout(
            title_text="H ratio",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)')
        else:
            fig.add_trace(go.Scatter(x=signal.av_f,y=abs(signal.H12),name=f"{signal.name} H_12"))
            fig.update_layout(
            title_text="H ratio",xaxis_title="Frequency (Hz)", yaxis_title='Amplitude (Pa)')
            
    # Add a range slider
    fig.update_layout(
        xaxis=dict(
            rangeslider=dict(
                visible=True
            ),
            type="linear"
        )
    )

    if fig_bool == False:
        fig.show()

def plot_av_fft_m(list_signals,dB=False,fig=None,smooth=None):

    if fig == None:
        fig = go.Figure()
        fig_bool=None
    else:
        fig_bool=1

    for signal in list_signals:
        if (signal.tube == 'large') or (signal.tube == 'small'):
            spectro_average(signal,signal.param.df)
            print('fft_ av')

        for i in [0,1]:
            # Fourier Transform
            amplitudes = abs(signal.av_fft[i])
            if dB:
                amplitudes = 20*np.log10(amplitudes/(2*10**-5))
            
            frequencies = np.array(signal.av_f)

            if signal.tube == 'large':
                frequencies_bool = (frequencies>=signal.param.large_fmin) & (frequencies <= signal.param.large_fmax)
            elif signal.tube == 'small':
                frequencies_bool = (frequencies>=signal.param.small_fmin) & (frequencies <= signal.param.small_fmax)
            else:
                frequencies_bool = (frequencies>=signal.param.large_fmin) & (frequencies <= signal.param.small_fmax)

            if smooth:
                N = 10
                amplitudes = np.hstack( [np.array(amplitudes[:int(N/2)]),np.array([np.mean(amplitudes[i-int(N/2):(i+1)+int(N/2)]) for i in range(int(N/2),len(amplitudes)-int(N/2))]),np.array( amplitudes[-int(N/2):])])
            
            amplitudes[~frequencies_bool] = np.NAN
            fig.add_trace( go.Scatter(x=frequencies[frequencies>=0],y=amplitudes[frequencies>=0],name=f"{signal.name} Input {i + 1}")  )

        # Modify the title and axis name 
        if dB:
            fig.update_layout(
                title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)',
                legend_title='<b> Legends </b>'               
            )
        else:
            fig.update_layout(
                title_text="Acquired signals Fourier Transform",xaxis_title="Frequency (Hz)", yaxis_title='Amplitude (Pa)',
                legend_title='<b> Legends </b>'               
            )

        # Add a range slider
        fig.update_layout(
            xaxis=dict(
                rangeslider=dict(
                    visible=True
                ),
                type="linear"
            )
        )

    
    if fig_bool == None:

        fig.show()
    

In [91]:
def computation_R_coefficient(sig):
    if sig.tube != 'combined':
        spectro_average(sig,sig.param.df)
        print('computation_R_coefficient spectro average', sig.param.df,'s',sig.s)

    f = sig.av_f
    mic1 = sig.seg[0][:,:]
    mic2 = sig.seg[1][:,:]
    
    try:
        H_ratio = sig.H_ratio
    except:
        print('No H ratio')
        H_ratio = 1
    sig.k =  2*np.pi*f/sig.c0
    H_ratio = 1 #A ENLEVER

    expiksn = np.repeat([ np.exp(-1j*sig.k*sig.s)],np.shape(sig.seg[0][:,:])[0],axis=0)
    expiksp = np.repeat([ np.exp(1j*sig.k*sig.s)],np.shape(sig.seg[0][:,:])[0],axis=0)
    R0 = (H_ratio*mic2/mic1 - expiksn)/(expiksp - H_ratio*mic2/mic1)
    sig.R = np.mean(np.array([np.exp(1j*2*sig.k*sig.l)] * R0 ),axis=0)
    
    
def computation_a_coefficient(self):
    computation_R_coefficient(self)
    self.a = 1 - abs(self.R)**2


def computation_Z_impedance(self):
    computation_R_coefficient(self)
    p0 = 1225
    # self.Z = self.c0*p0*(1+self.R)/(1-self.R)  
    self.Z = 428*(1+self.R)/(1-self.R)  


def plot_computation_R_a_Z_coefficient_m(list_signals,metric,fig=None):

    if fig == None:
        fig = go.Figure()
        fig_bool = True
    else:
        fig_bool = False
    ref = {'R':computation_R_coefficient,'a':computation_a_coefficient,'Z':computation_Z_impedance}
    func = ref[metric]

    for sig in list_signals:
        compute_H12(sig)
        func(sig)
        
        frequencies_bool = frequencies_limits(sig)
        y= abs(sig.R) if metric=='R'else sig.a if metric == 'a' else abs(sig.Z) 
        y[~frequencies_bool] = np.NaN

        fig.add_trace(go.Scatter(x=sig.av_f,y=y,name=f"{sig.name} {metric}"))
        fig.update_layout(
                title_text=f"{'R coefficient' if metric == 'R' else 'a coefficient' if metric == 'a' else 'Z impedance' if metric == 'Z' else ''}",xaxis_title="Frequency (Hz)", yaxis_title='Coefficient')

    fig.update_layout(
        xaxis=dict(
            rangeslider=dict(
                visible=True
            ),
            type="linear"
        )
    )
    
    if fig_bool :
        fig.show()


def compute_coherence(signal):
    if signal.tube != 'combined':
        spectro_average(signal,signal.param.df)
    S = lambda i,j: signal.seg[i][:,:]*signal.seg[j][:,:]
    signal.coh = np.nanmean( (S(0,1)*S(1,0))/(S(0,0)*S(1,1)), axis = 0)


def plot_computation_coherence_m(list_signals,fig=None):

    if fig == None:
        fig = go.Figure()
        fig_bool = True
    else:
        fig_bool = False

    for sig in list_signals:
        compute_coherence(sig)
        frequencies_bool = frequencies_limits(sig)
        y = sig.coh
        print(y)
        y[~frequencies_bool] = np.NaN
        fig.add_trace(go.Scatter(x=sig.av_f,y=abs(y),name=f"{sig.name} Coherence"))
        fig.update_layout(title_text=f"Coherence",xaxis_title="Frequency (Hz)", yaxis_title='Coefficient')

    if fig_bool :
        fig.show()


def combine_metric(signal_combined,metric):
    """Combine the metric(f) of two signals

    Args:
        signal_combined (_type_): _description_
        metric (_type_): _description_

    Returns:
        _type_: _description_
    """
    # retrive the signals that where used to create the combined signal
    signal_large,signal_small = signal_combined.signal_large, signal_combined.signal_small
    frequencies = signal_large.av_f
    df = signal_large.param.df
    nb_point = int((1600-500)/df)

    ref = {'R':computation_R_coefficient,
           'a':computation_a_coefficient,
           'Z':computation_Z_impedance,
           'fft':lambda x: spectro_average(x,df),
           'H12':compute_H12,
           'coh':compute_coherence}
    func = ref[metric]

    func(signal_large)
    func(signal_small)



    # Gaussian window to smooth the transitions
    w = windows.get_window(('gaussian',nb_point*2/6),nb_point*2)
    weigth = w[int(len(w)/2):]

    # select the right range for each signal
    frequencies_large = (frequencies>=50) & (frequencies <= 1600)
    frequencies_small = (frequencies>=500) & (frequencies <= 6400)

    signal_merged = [[],[]]

    if metric == 'R':
        signal_large_metric = signal_large.R
        signal_small_metric = signal_small.R

    elif metric == 'a':
        signal_large_metric = signal_large.a
        signal_small_metric = signal_small.a

    elif metric == 'Z':
        signal_large_metric = signal_large.Z
        signal_small_metric = signal_small.Z
    
    elif metric == 'H12':
        signal_large_metric = signal_large.H12
        signal_small_metric = signal_small.H12
    
    elif metric == 'coh':
        signal_large_metric = signal_large.coh
        signal_small_metric = signal_small.coh

    elif metric == 'fft':
        signal_large_metric = signal_large.av_fft
        signal_small_metric = signal_small.av_fft
        signal_merged = [[],[]]

        for i in [0,1]:
            merge = (signal_large_metric[i][int(50/df):int(1600/df)][-int(nb_point):]* weigth  + signal_small_metric[i][int(500/df):int(6401/df)][:int(nb_point)]* weigth[::-1] )/(weigth[::-1]+weigth)
            signal_merged[i] = np.hstack([np.ones(int(50/df))*np.NAN,signal_large_metric[i][frequencies_large][:-int(nb_point)-1], merge, signal_small_metric[i][frequencies>=500][int(nb_point):]])
        
        signal_combined.av_fft = signal_merged
        return signal_merged


    merge = (signal_large_metric[int(50/df):int(1600/df)][-int(nb_point):]* weigth  + signal_small_metric[int(500/df):int(6401/df)][:int(nb_point)]* weigth[::-1] )/(weigth[::-1]+weigth)
    signal_merged = np.hstack([np.ones(int(50/df))*np.NAN,signal_large_metric[frequencies_large][:-int(nb_point)-1], merge, signal_small_metric[frequencies>=500][int(nb_point):]])

    if metric == 'R':
        signal_combined.R = signal_merged
    elif metric == 'a':
        signal_combined.a = signal_merged
    elif metric == 'Z':
        signal_combined.Z = signal_merged
    elif metric == 'H12':
        signal_combined.H12 = signal_merged
    elif metric == 'coh':
        signal_combined.coh = signal_merged

    return signal_merged

    

### Interface Functions

#### Parameters

In [92]:
PARAMETERS = Parameters()
               
## create button to select the tube
# display the frequency

#### Calibration

In [95]:
class Calibration():
    def __init__(self,PARAMETERS:Parameters) -> None:
        self.PARAMETERS = PARAMETERS
        # self.Task_signal = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':'SNR signal','tube':None,'material':None,'resonator':None})
        # self.Task_noise = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':'SNR noise','tube':None,'material':None,'resonator':None})
        
        self.H_p = []
        self.H_np = []

        self.ratio_H = []
        self.tube = 'large'

        # Initialization widgets
        self.init_widget()
        self.init_label_setup()
        self.init_widget_plot()

        # Initialization events
        self.init_interactivity()

        # Initialization display
        self.Box = VBox([])

        self.Box_plot = VBox([])
        
        self.tab = widgets.Tab(children=[self.Box,self.Box_plot])
        self.tab.set_title(0,'Calibration')
        self.tab.set_title(1,'Acquired signal')

        self.id_tab = display(self.tab,display_id=True)
        self.update_layout_int()
        

    def oc_dropdown_tube(self,change):

        value = change['new']
        if value == 0:
            self.tube = 'small'
            print('tube changed', self.tube)
        elif value == 1:
            self.tube = 'large'
            print('tube changed', self.tube)


    def oc_calibration(self,arg):
        self.label_update.value = 'Microphone : Calibration ongoing'

        # Acquisition of the signal
        param = Parameters()
        Task_c = TaskAcquisitionGeneration(param,measurement_caracteristics={'name':'Calibration','tube':self.tube,'material':'','resonator':''})
        duration = 4
        Task_c.simple_acquisition(duration=duration)

        # Detection of the active microphone
        if np.mean(abs(np.array(Task_c.acquired_samples[0]))) > np.mean(np.array(abs(np.array(Task_c.acquired_samples[1])))):
            mic = 0
            print('mic 1')
        else:
            mic = 1
            print('mic 2')

        Task_c.name = f'Calibration M {mic+1}'



        # Fourier Transform and pic detection at 1000 Hz
        sig = Task_c.acquired_samples[mic]
        amp = fft(sig)/len(sig)
        freq = fftfreq(len(sig),1/Task_c.sample_rate)
        max_amp,arg = max(abs(amp[freq>0])),np.argmax(abs(amp[freq>0]))
        freq_max = freq[freq>0][arg]

        # update the Parameters
        self.PARAMETERS.calibration_cst[str(mic)] = max_amp
        print(f'The voltage generated by 1000 Hz 94 dB signal is {max_amp}')
        
        Task_c.calibration_cst = self.PARAMETERS.calibration_cst
        Task_c.convert_and_filter_acquired_signal()
        Task_c.convert()

        # Change the icon of the interface and the texte
        print(arg,type(arg))
        Task_c.plot_fft(fig=self.figure_fft_acq_sig)
        Task_c.plot_acquired_signal(fig = self.figure_acq_sig)
        assert 990<freq_max<1010, 'The frequency at which the maximum is reached is below 995 Hz and above 1005 Hz'

        self.label_update.value = 'Microphone: Calibration done'
        try:
            arg.icon ='check'
        except:
            pass

        # self.Task_noise.calibration_cst[mic] = max_amp
        # self.Task_signal.calibration_cst[mic] = max_amp



    def oc_SNR(self,noise_sig,arg):

        duration = 3

        if noise_sig == 0:
            self.Task_noise = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':'SNR noise','tube':'','material':None,'resonator':None})
           
            self.label_update.value = 'Noise acquisition ongoing'

            # Noise acquisition
            self.Task_noise.simple_acquisition(duration=duration)
            # self.Task_noise.plot_generated_signal(self.figure_gen_sig)
            self.Task_noise.plot_acquired_signal(fig=self.figure_acq_sig)
            self.Task_noise.plot_fft(fig=self.figure_fft_acq_sig)

            self.label_update.value = 'Noise acquisition done'


        elif noise_sig == 1:
            self.Task_signal = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':'SNR signal','tube':'','material':None,'resonator':None})


            self.label_update.value = 'Signal acquisition ongoing'
            # Signal acquisition
            self.Task_signal.generation_white_noise(duration)
            self.Task_signal.plot_generated_signal(self.figure_gen_sig)
            self.Task_signal.plot_acquired_signal(fig=self.figure_acq_sig)
            self.Task_signal.plot_fft(fig=self.figure_fft_acq_sig)

            self.label_update.value = 'Signal acquisition done'



    def return_transfert_function(self,np_p):
        ## Acquisition with normal configuration
        if np_p == 'p':
            if self.tube=='large':
                self.Task_norm_p_large = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':f'H_{np_p}','tube':self.tube,'material':None,'resonator':None})
                Task_norm = self.Task_norm_p_large
            else:
                self.Task_norm_p_small = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':f'H_{np_p}','tube':self.tube,'material':None,'resonator':None})
                Task_norm = self.Task_norm_p_small

        elif np_p == 'np':
            if self.tube == 'large':
                self.Task_norm_np_large = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':f'H_{np_p}','tube':self.tube,'material':None,'resonator':None})
                Task_norm = self.Task_norm_np_large
            else:
                self.Task_norm_np_small = TaskAcquisitionGeneration(self.PARAMETERS,measurement_caracteristics={'name':f'H_{np_p}','tube':self.tube,'material':None,'resonator':None})
                Task_norm = self.Task_norm_np_small

        duration = 5

        Task_norm.generation_white_noise(duration)


        spectro_average(Task_norm,Task_norm.param.df)
        fft_np_mic_2 = Task_norm.av_fft[1]
        fft_np_mic_1 = Task_norm.av_fft[0]
        frequencies_n = Task_norm.av_f

        # fft_np_mic_2 = abs(fft(Task_norm.acquired_samples[0]))/len(Task_norm.acquired_samples[0])  
        # fft_np_mic_1 = abs(fft(Task_norm.acquired_samples[1]))/len(Task_norm.acquired_samples[1])  
        # frequencies_n = fftfreq(len(Task_norm.acquired_samples[0]), 1/Task_norm.sample_rate)  # Fréq
        
        H_np = fft_np_mic_2[frequencies_n>=0]/fft_np_mic_1[frequencies_n>=0]
        self.frequencies_n  = frequencies_n[frequencies_n>=0]

        Task_norm.plot_generated_signal(fig_gs = self.figure_gen_sig)
        Task_norm.plot_acquired_signal(fig = self.figure_acq_sig)
        Task_norm.plot_fft(fig = self.figure_fft_acq_sig)
        return H_np


    def oc_H(self,np_p,arg):
        
        
        if np_p == 0:
            self.label_update.value = "Permuted microphone acquisition ongoing"

            if self.tube=='large':
                self.H_p_large = self.return_transfert_function('p')
            else:
                self.H_p_small = self.return_transfert_function('p')
            
            self.label_update.value = "Permuted microphone acquisition done"

        elif np_p == 1:
            self.label_update.value = "Non - Permuted microphone acquisition ongoing"

            if self.tube == 'large':
                self.H_np_large = self.return_transfert_function('np')
            else:
                self.H_np_small = self.return_transfert_function('np')

            self.label_update.value = "Non - Permuted microphone acquisition done"


    def oc_result(self,arg):
        
        try:
            if self.tube == 'large':
                self.ratio_H = np.sqrt(np.array(self.H_p_large)/np.array(self.H_np_large))
                self.PARAMETERS.H_ratio['large'] = self.ratio_H
                H_p = self.H_p_large
                H_np = self.H_np_large
                range=[50,1600]
            else:
                print('small')
                self.ratio_H = np.sqrt(np.array(self.H_p_small)/np.array(self.H_np_small))
                self.PARAMETERS.H_ratio['small'] = self.ratio_H
                H_p = self.H_p_small
                H_np = self.H_np_small
                range=[500,6400]


            self.fig_SNR.data = []
            
            # self.fig_SNR.add_trace( go.Scatter(x=self.frequencies_n,y=self.ratio_H))
            y_p = 20*np.log10(abs(np.array(H_p))/(2*10**(-5)))
            y_np = 20*np.log10(abs(np.array(H_np))/(2*10**(-5)))
            y_p[(self.frequencies_n<range[0]) | (self.frequencies_n>range[1])] = np.NAN
            y_np[(self.frequencies_n<range[0]) | (self.frequencies_n>range[1])] = np.NAN
            y_p = average_smoothing(y_p,N=5)
            y_np = average_smoothing(y_np,N=5)
            
            self.fig_SNR.add_trace( go.Scatter(x=self.frequencies_n,y=y_p,name='Permuted'))
            self.fig_SNR.add_trace( go.Scatter(x=self.frequencies_n,y=y_np,name='Non Permuted'))
            self.fig_SNR.update_layout(
                    title_text="Transfert function",xaxis_title="Frequency (Hz)", yaxis_title='Gain (dB)',
                    legend_title='<b> Legends </b>'               
                )
            self.fig_SNR.update_layout(xaxis_range=range,yaxis = {"autorange": True,"fixedrange":False})

        except Exception as e:
            self.label_update.value = f'{e} : One of the signal was not recoded, the result can not be computed.'


    def oc_result_SNR(self,arg):
        self.fig_SNR.data = []
        try:
            plot_fft_m([self.Task_noise,self.Task_signal],fig=self.fig_SNR,dB=True,smooth=20)
            if self.tube == 'large':
                range=[50,1600]
            else:
                range=[500,6400]

            self.fig_SNR.update_layout(xaxis_range=range,yaxis = {"autorange": True,"fixedrange":False})


        except Exception as e:
            self.label_update.value = f'{e} : One of the signal was not recorded'
            self.fig_SNR.add_trace(go.Scatter(x=[0],y=[0]))


        
    def oc_clear(self,arg):
        for fig in [self.figure_gen_sig,self.figure_acq_sig, self.figure_fft_acq_sig]:
            fig.data= []
            
    def init_widget_plot(self):

        init_scatter = go.Scatter(x=[],y=[])
        self.figure_gen_sig = go.FigureWidget(data=init_scatter)
        self.figure_gen_sig.update_layout(
            width=800, 
            height=400
        )
        self.figure_acq_sig = go.FigureWidget(data=init_scatter)
        self.figure_acq_sig.update_layout(
            width=800, 
            height=400
        )
        self.figure_fft_acq_sig = go.FigureWidget(data=init_scatter)
        self.figure_fft_acq_sig.update_layout(
            width=800, 
            height=400
        )
        self.button_plot_clear =  widgets.Button(
        description='Clear',
        disabled=False,
        button_style='', 
        tooltip='Clear',
        )



    def init_widget(self):

        init_scatter = go.Scatter(x=[],y=[])
        self.fig_SNR = go.FigureWidget(data=init_scatter)
        self.fig_SNR.update_layout(
            width=800, 
            height=400
        )

        # Button creation
        self.button_calibration_mic_1 = widgets.Button(
        description='Calibration mic 1',
        disabled=False,
        button_style='', 
        tooltip='Calibration mic 1',
        icon='square-o' ,
        )

        self.button_calibration_mic_2 = widgets.Button(
        description='Calibration mic 2',
        disabled=False,
        button_style='', 
        tooltip='Calibration mic 2',
        icon='square-o' ,
        )

        self.button_H_np = widgets.Button(
        description='not swapped',
        disabled=False,
        button_style='', 
        tooltip='not swapped',
        icon='square-o' ,
        )

        self.button_H_p = widgets.Button(
        description='swapped',
        disabled=False,
        button_style='', 
        tooltip='swapped',
        icon='square-o' ,
        )

        self.button_result = widgets.Button(
        description='result',
        disabled=False,
        button_style='', 
        tooltip='result',
        )

        self.button_result_SNR = widgets.Button(
        description='result',
        disabled=False,
        button_style='', 
        tooltip='result',
        )


        self.button_SNR_noise = widgets.Button(
        description='Noise Acquisition',
        disabled=False,
        button_style='', 
        tooltip='Noise Acquisition',
        icon='square-o' 
        )
        
        self.button_SNR_signal = widgets.Button(
        description='Signal Acquisition',
        disabled=False,
        button_style='', 
        tooltip='Signal Acquisition',
        icon='square-o' ,
        )

        self.label_update = widgets.Label(
            value="Update"
        )

    def init_label_setup(self):
        self.label_calibration = Label(value="Microphones Calibration")
        self.label_calibration2 = Label(value="SNR")
        self.label_calibration3 = Label(value="Transfert Function")
        self.label_setup_title = Label(value="Set Up")
        
        self.dropdown_tube = widgets.Dropdown(
            options=[('large',1),('small',0)], 
            value=1,
            description='Tube :',
        )
        


    def init_interactivity(self):
        self.button_calibration_mic_1.on_click(self.oc_calibration)
        self.button_calibration_mic_2.on_click(self.oc_calibration)
        self.button_SNR_noise.on_click(functools.partial(self.oc_SNR, 0))
        self.button_SNR_signal.on_click(functools.partial(self.oc_SNR, 1))

        self.button_H_p.on_click(functools.partial(self.oc_H, 0))
        self.button_H_np.on_click(functools.partial(self.oc_H, 1))

        self.button_result.on_click(self.oc_result)
        self.button_result_SNR.on_click(self.oc_result_SNR)

        self.button_plot_clear.on_click(self.oc_clear)

        self.dropdown_tube.observe(self.oc_dropdown_tube,'value')


    def update_layout_int(self):
        self.Box_calibration_mic = HBox([self.button_calibration_mic_1,self.button_calibration_mic_2])
        self.Box_calibration_SNR = HBox([self.button_SNR_noise,self.button_SNR_signal,self.button_result_SNR])
        self.Box_plot_SNR = HBox([self.fig_SNR])
        self.Box_H = HBox([self.button_H_p,self.button_H_np])
        self.Box_result = HBox([self.button_result])

       
        self.Box_setup = VBox([self.label_setup_title,self.dropdown_tube])

        self.grid = GridspecLayout(10, 3, height='auto',display='flex')
        k = 1
        self.grid[(k-1),:] = self.label_update
        self.grid[k,:] = self.label_calibration
        self.grid[k+1,:] = self.Box_calibration_mic
        self.grid[k+3,:] = self.label_calibration2
        self.grid[k+4,:] = self.Box_calibration_SNR
        self.grid[k+6,:] = self.label_calibration3
        self.grid[k+7,:] = self.Box_H
        self.grid[k+8,:] = self.Box_result

        self.Box = VBox([self.Box_setup,self.grid,self.Box_plot_SNR])

        self.Box_plot = VBox([HBox([fig]) for fig in [self.figure_gen_sig,self.figure_acq_sig, self.figure_fft_acq_sig]] + [HBox([self.button_plot_clear])])
        
        self.tab = widgets.Tab(children=[self.Box,self.Box_plot])
        self.tab.set_title(0,'Calibration')
        self.tab.set_title(1,'Acquired signal')

        clear_output(wait=True)
        self.id_tab.update(self.tab) 

In [96]:
calibration = Calibration(PARAMETERS)

Tab(children=(VBox(children=(VBox(children=(Label(value='Set Up'), Dropdown(description='Tube :', options=(('l…

Generation defined
Acquisition defined
Tasks started


Exception ignored on converting result of ctypes callback function: <bound method TaskAcquisitionGeneration.callback_function of <__main__.TaskAcquisitionGeneration object at 0x0000028B3FA03580>>
TypeError: 'NoneType' object cannot be interpreted as an integer

unclosed socket <zmq.Socket(zmq.PUSH) at 0x28b33a28ee0>

: <bound method TaskAcquisitionGeneration.callback_stop_acquisition of <__main__.TaskAcquisitionGeneration object at 0x0000028B3FA03580>>
TypeError: 'NoneType' object cannot be interpreted as an integer


Sample read
Acquisition stoped


Exception ignored on converting result of ctypes callback function: <bound method TaskAcquisitionGeneration.callable_stop_generation of <__main__.TaskAcquisitionGeneration object at 0x0000028B3FA03580>>
TypeError: 'NoneType' object cannot be interpreted as an integer


Generation stop stoped
Tasks closed
Generation defined
Acquisition defined
Tasks started


Exception ignored on converting result of ctypes callback function: <bound method TaskAcquisitionGeneration.callback_function of <__main__.TaskAcquisitionGeneration object at 0x0000028B41C28280>>
TypeError: 'NoneType' object cannot be interpreted as an integer

unclosed socket <zmq.Socket(zmq.PUSH) at 0x28b2f974f40>

: <bound method TaskAcquisitionGeneration.callback_stop_acquisition of <__main__.TaskAcquisitionGeneration object at 0x0000028B41C28280>>
TypeError: 'NoneType' object cannot be interpreted as an integer


Sample read
Acquisition stoped


Exception ignored on converting result of ctypes callback function: <bound method TaskAcquisitionGeneration.callable_stop_generation of <__main__.TaskAcquisitionGeneration object at 0x0000028B41C28280>>
TypeError: 'NoneType' object cannot be interpreted as an integer


Generation stop stoped
Tasks closed


In [61]:
calibration.Task_norm_p_large.save_to_cjson('08_12_norm_p_small')

In [62]:
calibration.Task_norm_np_large.save_to_cjson('08_12_norm_np_small')

In [63]:
PARAMETERS = calibration.PARAMETERS
ALL_SIGNALS = All_Signal()
PARAMETERS.H_ratio


{'large': array([1.24742187-0.j        , 1.67540322-0.93116377j,
        0.40768343+0.13420848j, ..., 0.26222096+1.55728457j,
        0.17652189-0.79071969j, 1.04401387+0.j        ]),
 'small': array([1.61207286+0.j        , 1.37569091-0.47686963j,
        0.63503987-0.36792887j, ..., 1.86011   +0.08506277j,
        0.07915708+1.39766635j, 0.        +1.27456688j]),
 'combined': 1}

#### Measurements and Plot

In [69]:
class Measurement():
    def __init__(self,ALL_SIGNALS:All_Signal) -> None:

        self.all_signal = ALL_SIGNALS
        # self.all_signal.add_signal(Parameters(),'test_large','large','None','None')
        self.all_signal.add_signal(Parameters(),'test_large_lowd','large','None','None')
        self.all_signal.add_signal(Parameters(),'test_small_lowd','large','None','None')
        # self.all_signal.add_signal(PARAMETERS,'test2','large','None','None')
        # self.all_signal.add_signal(PARAMETERS,'large','large','None','None')
        # self.all_signal.add_signal(PARAMETERS,'small','small','None','None')
        self.all_signal.all_signal['test_small_lowd'].open_from_json('06_12_30s_small_lowdens_2')
        self.all_signal.all_signal['test_large_lowd'].open_from_json('06_12_30s_large_lowdens')
        self.all_signal.all_signal['test_small_lowd'].s = 0.02
        self.all_signal.all_signal['test_small_lowd'].l = 0.035
        self.all_signal.all_signal['test_large_lowd'].l = 0.100
        self.all_signal.all_signal['test_large_lowd'].s = 0.05
        self.all_signal.all_signal['test_large_lowd'].df = 5
        self.all_signal.all_signal['test_large_lowd'].param.df = 5
        self.all_signal.all_signal['test_small_lowd'].df = 5
        self.all_signal.all_signal['test_small_lowd'].param.df = 5
        # self.all_signal.all_signal['test2'].open_from_json('new sys whitenoise np')
        # self.all_signal.all_signal['large'].open_from_json('30 s 2 new sys whitenoise p')
        # self.all_signal.all_signal['small'].open_from_json('30 s small 2 new sys whitenoise p')
        
        self.tube_corr = {0:'Large',1:'Small'}
        self.material_corr = {0:'None',1:'low-density felt', 2: 'hight-density felt',3:'porous alveolate material'}
        self.resonator_corr = {0:'None',1:'R1',2:'R2',3:'R3',4:'R4',5:'R5'}

        # Initialization of the widget
        self.init_widgets()
        # Initialization of the event system
        self.init_interactivity()
        # Initialization of the display
        self.Box_list_sig = HBox([self.select_measurement])
        self.Box_tube = HBox([self.dropdown_tube])
        self.Box_material = HBox([self.dropdown_material,self.dropdown_resonator])
        self.Box_name = HBox([self.text_name])
        self.Box_add_del = HBox([self.button_add, self.button_delete])
        self.Box_measure = HBox([self.button_measure])
        self.Box = VBox([self.Box_list_sig,self.Box_tube,self.Box_material,self.text_name,self.Box_add_del,self.Box_measure])

        # self.Box_id = display(self.Box,display_id=True)

        ## Plot
        self.init_widgets_plot()
        self.init_interactivity_plot()
        self.plot_corr = {1:plot_av_fft_m,
                          2:lambda x,fig : plot_computation_R_a_Z_coefficient_m(x,'R',fig),
                          3:lambda x,fig : plot_computation_R_a_Z_coefficient_m(x,'a',fig),
                          4:lambda x,fig : plot_computation_R_a_Z_coefficient_m(x,'Z',fig),
                          5:plot_computation_ratio_H_m,
                          6:plot_computation_coherence_m}
        self.plot_corr_metric = {1:'fft',
                          2:'R',
                          3:'a',
                          4:'Z',
                          5:'H12',
                          6:'coh'}

        self.Box_plot= VBox([self.select_multiple_signals,self.dropdown_plot_function,self.button_plot,self.button_combine,self.figure,self.button_clear])

        self.tab = widgets.Tab(children=[self.Box,self.Box_plot])
        self.tab.set_title(0,'Measurements')
        self.tab.set_title(1,'Plot')

        display(self.tab)
    
    
    def init_widgets_plot(self):
        # List of the created Measurement (Signal)
        self.select_multiple_signals = widgets.SelectMultiple(
            options= self.all_signal.names,
            # value = self.all_signals.names[0],
            description='Signals : ',
            disabled=False
        )

    
        self.button_plot = widgets.Button(
            description='Plot',
            disabled=False,
            tooltip='Plot',
        )
        self.button_clear = widgets.Button(
            description='Clear',
            disabled=False,
            tooltip='Clear',
        )

        self.dropdown_plot_function = widgets.Dropdown(
            options=[('fft',1),('Reflexion coeff',2),('Absorption coeff',3),('Impedance',4),('Transfert',5),('Coherence',6)], 
            value=1,
            description='Figure :',
        )

        init_scatter = go.Scatter(x=[],y=[])
        self.figure = go.FigureWidget(data=init_scatter)
        self.figure.update_layout(
            width=800, 
            height=400
        )

        self.button_combine = widgets.Button(
            description = 'Combine',
            disabled = False,
            tooltip = 'Combine'
        )

    def oc_plot_multiple(self,button):

        # get all the signal
        selected_signals = [self.all_signal.all_signal[s] for s in self.select_multiple_signals.value]
        selected_plot = self.dropdown_plot_function.value
        for signal in selected_signals:

            if signal.tube == 'large' or signal.tube == 'small':
                selected_plot_function = self.plot_corr[selected_plot]
                selected_plot_function(selected_signals,fig=self.figure)
            else:
                # if the signal is a combined signal
                metric = self.plot_corr_metric[selected_plot]
                merged_metric = combine_metric(signal,metric)
                freq = signal.signal_large.av_f

                if metric == 'fft':
                    self.figure.add_trace(go.Scatter(x=freq,y=abs(merged_metric[0]),name=f'{signal.name} {metric} M1'))
                    self.figure.add_trace(go.Scatter(x=freq,y=abs(merged_metric[1]),name=f'{signal.name} {metric} M2'))
                else:
                    if metric != 'a':
                        self.figure.add_trace(go.Scatter(x=freq,y=abs(merged_metric),name=f'{signal.name} {metric}'))
                    else:
                        self.figure.add_trace(go.Scatter(x=freq,y=merged_metric,name=f'{signal.name} {metric}'))
            if metric == 'coh':
                self.figure.update_layout(yaxis_range=[0,2],yaxis = {"autorange": True,"fixedrange":False})
            else:
                self.figure.update_layout(yaxis = {"autorange": True,"fixedrange":False})






    def oc_clear_figure(self,button):
        self.figure.data = []


    def init_interactivity_plot(self):
        self.button_plot.on_click(self.oc_plot_multiple)
        self.button_clear.on_click(self.oc_clear_figure)
        self.button_combine.on_click(self.oc_button_combine)

        






# show a list of already existing signal
# posibility of selecting one signal to delete it
# delete button
# add button # fiel the tube standard, the padding and for the name that is prefiel with the parameters

# show different color id the signal are not acquired for the name

# add a button take measurement

# oc_measurement
# get the signal
# lance une acquisition et generation
    def init_widgets(self):
        # List of the created Measurement (Signal)
        self.select_measurement = widgets.Select(
            options= self.all_signal.names,
            value = None,
            description='Signals : ',
            disabled=False
        )

        self.button_delete = widgets.Button(
        description='Delete',
        disabled=False,
        tooltip='Delete',
        )

        self.button_add = widgets.Button(
        description='Add',
        disabled=False,
        tooltip='Add',
        )

        self.button_measure = widgets.Button(
        description='Measure',
        disabled=False,
        tooltip='Measure',
        )

        self.dropdown_tube = widgets.Dropdown(
            options=[('Large', 0), ('Small', 1)],
            value=0,
            description='Tube : ',
        )
    
        self.dropdown_material = widgets.Dropdown(
            options=[('None', 0),('low-density felt', 1), ('hight-density felt', 2), ('porous alveolate material', 3)],
            value=0,
            description='Material : ',
        )

        self.dropdown_resonator = widgets.Dropdown(
            options=[('None',0),('R1', 1), ('R2', 2), ('R3', 3),('R4', 4),('R5', 5)],
            value=0,
            description='Resonator : ',
        )

        self.text_name = widgets.Text(
            value='',
            placeholder='Name of the signal',
            description='String : ',
            disabled=False
        )

        

    def update_name(self,change):
        def get_key(dropdown):
            dict_ = dict(dropdown.options)
            val = dropdown.value
            key_list = list(dict_.keys())
            val_list = list(dict_.values())
            arg = val_list.index(val)
            return key_list[arg]
        
        self.text_name.value = f"{'Large' if self.dropdown_tube.value == 0 else 'Small'}" +' | ' + get_key(self.dropdown_material) + ' | ' + get_key(self.dropdown_resonator)


    def oc_dropdown_tube(self,change):
        # If the value corresponds to the large tube
        # then the user can select a material and/or a resonator
        # if the value corresponds to the small tube
        # then the user can select a material

        value = change['new']
        if value == 0:
            self.dropdown_resonator.options = [('None',0),('R1', 1), ('R2', 2), ('R3', 3),('R4', 4),('R5', 5)]
            # new_Box = HBox([self.dropdown_material,self.dropdown_resonator])

        elif value == 1:
            self.dropdown_resonator.options = [('None',0)]

        self.update_name(change=None)
        
            # new_Box = HBox([self.dropdown_material])

        # self.Box_material = new_Box
        # self.construct_interface()


    def oc_button_delete(self,button):
        # delete the name from the the list
        # change de options from the list
        selected_name = self.select_measurement.value
        self.all_signal.del_signal(selected_name)
        self.select_measurement.options = self.all_signal.names
        self.select_multiple_signals.options = self.all_signal.names

    def oc_button_add(self,button):
        # Get the caracteristic of the measurement
        tube = self.tube_corr[self.dropdown_tube.value].lower()
        material = self.material_corr[self.dropdown_material.value]
        resonator = self.resonator_corr[self.dropdown_resonator.value]
        name = self.text_name.value

        # Create a new signal
        self.all_signal.add_signal(calibration.PARAMETERS,name,tube,material,resonator)
        self.select_measurement.options = self.all_signal.names
        self.select_multiple_signals.options = self.all_signal.names

        self.text_name.value = ''

    def oc_button_measure(self,button):
        selected_name = self.select_measurement.value
        signal = self.all_signal.all_signal[selected_name]
        signal.measurement()

    def oc_button_combine(self,button):
        selected_names = self.select_multiple_signals.value
        if (self.all_signal.all_signal[selected_names[0]].tube == 'large') & (self.all_signal.all_signal[selected_names[1]].tube == 'small'):
            signal_large = self.all_signal.all_signal[selected_names[0]]
            signal_small = self.all_signal.all_signal[selected_names[1]]
        elif (self.all_signal.all_signal[selected_names[1]].tube == 'large') & (self.all_signal.all_signal[selected_names[0]].tube == 'small'):
            signal_large = self.all_signal.all_signal[selected_names[1]]
            signal_small = self.all_signal.all_signal[selected_names[0]]
        else:
            return 0
        
        combined_signal,combine_f = combine_signal(signal_large,signal_small)
        combined_seg,combined_seg_f = combine_signal_seg(signal_large,signal_small)
        combined_H12 = combine_signal_seg(signal_large,signal_small,H12=True) #AJOUTER BOOL H12=TRUE

        # Creation of a new Signal
        # Get the caracteristic of the measurement
        tube = 'combined'
        material = signal_large.material
        resonator = signal_large.resonator
        name = f"Combined" + f' | {material} | {resonator}'
        if name in list(ALL_SIGNALS.all_signal.keys()):
            name = name + ' 1'
            i = 2

        self.all_signal.add_signal(calibration.PARAMETERS,name,tube,material,resonator)
        self.all_signal.all_signal[name].av_fft = combined_signal
        self.all_signal.all_signal[name].av_f = combine_f
        self.all_signal.all_signal[name].seg = combined_seg
        self.all_signal.all_signal[name].H_ratio = combined_H12
        self.all_signal.all_signal[name].signal_large = signal_large
        self.all_signal.all_signal[name].signal_small = signal_small

        self.select_measurement.options = self.all_signal.names
        self.select_multiple_signals.options = self.all_signal.names



    def init_interactivity(self):
        self.dropdown_tube.observe(self.oc_dropdown_tube,'value')
        self.button_delete.on_click(self.oc_button_delete)
        self.button_add.on_click(self.oc_button_add)
        self.button_measure.on_click(self.oc_button_measure)
        self.dropdown_material.observe(self.update_name)
        self.dropdown_resonator.observe(self.update_name)


    def construct_interface(self):
        self.Box = VBox([self.Box_list_sig,self.Box_tube,self.Box_material,self.text_name,self.Box_add_del,self.Box_measure])
        clear_output(wait=True)
        self.Box_id.update(self.Box) 


Box_Measurement = Measurement(ALL_SIGNALS)


The signal already exists
The signal already exists


Tab(children=(VBox(children=(HBox(children=(Select(description='Signals : ', options=('test_large_lowd', 'test…

In [70]:
Box_Measurement.all_signal.all_signal.keys()

dict_keys(['test_large_lowd', 'test_small_lowd', 'Small | low-density felt | None', 'Large | low-density felt | None', 'Combined | low-density felt | None'])

In [71]:
test = Box_Measurement.all_signal.all_signal['Small | low-density felt | None']

In [72]:
test.save_to_cjson('08_12_30s_small_lowdens')

# TESTS ET AUTRES

### Coherence

In [43]:
param = Parameters()
p_large = TaskAcquisitionGeneration(param,{'name':'p','tube':'large','material':'','resonator':''})
p_large.open_from_json('06_12_30s_large_lowdens')
np_large = TaskAcquisitionGeneration(param,{'name':'np','tube':'large','material':'','resonator':''})
np_large.open_from_json('08_12_norm_np_large')

signal = p_large
spectro_average(signal,signal.param.df)
# S = lambda i,j: signal.seg[i][:,:]*signal.seg[j][:,:]
S = lambda i,j: signal.av_fft[i]*signal.av_fft[j]
signal.coh = np.nanmean( (S(0,1)*S(1,0))/(S(0,0)*S(1,1)), axis = 0)

In [50]:
signal.av_fft[0][:3],signal.av_fft[1][:3],signal.av_fft[0][:3]*signal.av_fft[1][:3]

(array([-0.00049906+0.00000000e+00j,  0.00060861+1.43282989e-04j,
        -0.00029091+9.64887644e-05j]),
 array([ 0.00114911+0.j        , -0.00141829-0.00011008j,
         0.00124054-0.00072731j]),
 array([-5.73469208e-07+0.00000000e+00j, -8.47412429e-07-2.70212257e-07j,
        -2.90710815e-07+3.31281584e-07j]))

In [55]:
mult = lambda i : signal.av_fft[0][i]*signal.av_fft[0][i]
mult(1)

(3.498765660786667e-07+1.7440702619525618e-07j)

In [46]:
y = abs(S(0,1))

### H_ratio

In [73]:
param = Parameters()
p_large = TaskAcquisitionGeneration(param,{'name':'p','tube':'large','material':'','resonator':''})
p_large.open_from_json('08_12_norm_p_large')
np_large = TaskAcquisitionGeneration(param,{'name':'np','tube':'large','material':'','resonator':''})
np_large.open_from_json('08_12_norm_np_large')


In [74]:
spectro_average(p_large,p_large.param.df)
spectro_average(np_large,np_large.param.df)
fig_fft = go.Figure()
# plot les fft respective
#check min man center, decalage
H_12 = np.array(p_large.av_fft)/np.array(np_large.av_fft)
fig_H12 = go.Figure()
fig_H12.add_trace(go.Scatter(y=abs(H_12[0])))


### Combine

In [None]:
param = Parameters()

In [None]:
LARGE = TaskAcquisitionGeneration(param,{'name':'LARGE','tube':'large','material':'','resonator':''})
SMALL = TaskAcquisitionGeneration(param,{'name':'LARGE','tube':'small','material':'','resonator':''})
SMALL.open_from_json('06_12_30s_small_lowdens_2')
LARGE.open_from_json('06_12_30s_large_lowdens')

In [None]:
arr_combined = combine_signal(LARGE,SMALL)
COMBINED = TaskAcquisitionGeneration(LARGE.param,{'name':'LARGE','tube':'combined','material':'','resonator':''})
COMBINED.signal_large = LARGE
COMBINED.signal_small = SMALL

In [None]:
combine_metric(COMBINED,'a')

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=abs(COMBINED.a)))

In [None]:
nb_point = 1600-500
df = signal_large.av_f[1]
w = windows.get_window(('gaussian',nb_point*2/6),nb_point*2)
# fig,ax = plt.subplots()
weigth = w[int(len(w)/2):]
# ax.plot(w[int(len(w)/2):])

print('av fft')
frequencies_small = frequencies_limits(signal_small)
frequencies_large = frequencies_limits(signal_large)

signal_large_crop = signal_large.av_fft[0]
signal_small_crop = signal_small.av_fft[0]
signal_large_crop[~frequencies_large] = np.NaN
signal_small_crop[~frequencies_small] = np.NAN

# merge = (signal_large.av_fft[0][-len(weigth):] * weigth + signal_small.av_fft[0][:len(weigth)] * weigth[::-1])
merge = (signal_large.av_fft[0][int(50/df):int(1600/df)][-int(nb_point/df):]* weigth  + signal_small.av_fft[0][int(500/df):int(6401/df)][:int(nb_point/df)]* weigth[::-1] )/(weigth[::-1]+weigth)
signal_merged = np.hstack([np.ones((50))*np.NAN,signal_large.av_fft[0][int(50/df):int(1600/df)-int(nb_point/df)], merge, signal_small.av_fft[0][frequencies_small][int(nb_point/df):]])

signal_small.plot_av_fft()
y1 = np.hstack([signal_large_crop[(signal_large.av_f>=0)&(signal_large.av_f <= signal_large.param.large_fmax)][0:-nb_point]])
y2 = np.hstack([signal_large_crop[(signal_large.av_f>=0)&(signal_large.av_f <= signal_large.param.large_fmax)][0:-nb_point], merge ])
y3 = np.hstack([signal_large_crop[(signal_large.av_f>=0)&(signal_large.av_f <= signal_large.param.large_fmax)][0:-nb_point], merge , signal_small_crop[frequencies_small][nb_point:]])

fig = go.Figure()

signal_large.plot_av_fft(fig=fig)
signal_small.plot_av_fft(fig=fig)
fig.add_trace(go.Scatter(x=np.arange(0,len(signal_merged)),y=signal_merged))
fig.add_trace(go.Scatter(x=np.arange(500,500+len(weigth)),y=weigth*np.nanmax(signal_large_crop)))
fig.add_trace(go.Scatter(x=np.arange(500,500+len(weigth)),y=weigth[::-1]*np.nanmax(signal_large_crop)))

# fig.add_trace(go.Scatter(x=np.arange(0,len(y2)),y=y2))

# fig.add_trace(go.Scatter(x=np.arange(0,len(y1)),y=y1))

# fig.add_trace(go.Scatter(x=np.arange(1,len( w[int(len(w)/2):])),y=weigth))


fig


### spectrogramme

In [None]:
def fft_averaged(name,df,fig=None):

    if fig == None:
        fig = go.Figure()
        fig_bool = True
    else:
        fig_bool = False

    Task_spec = TaskAcquisitionGeneration(param)
    Task_spec.open_from_json(name)
    x = np.array(Task_spec.acquired_samples[0])
    fe = Task_spec.sample_rate
    # df = 1/t = fe/N  =>N = fe/df
    N = int(fe/df)
    overlap = int(N //8)
    # overlap = 0
    segment = np.array(x[:N])

    for i in range(1,int(len(x)//(N-overlap)-1)):
        segment = np.vstack((segment,np.array(x[i*(N-overlap):i*(N-overlap)+N])))

    print(f'nb of win : {len(x)//(N-overlap)}, nb of points : {N}, overlap : {overlap} , shape segments : {np.shape(segment)}')

    freq = fftfreq(N,1/fe)
    segment_fft = [abs(fft(seg))/N for seg in segment]
    segment_fft = [seg[freq>=0] for seg in segment_fft]

    segment_fft_av = np.mean(segment_fft,axis=0)
    print(np.shape(segment_fft_av))

    fig.add_trace( go.Scatter(x = freq[freq>=0] , y=segment_fft_av,name=f'{df}'))

    if fig_bool:
        fig.show()



In [None]:
param = Parameters()

In [None]:
LARGE = TaskAcquisitionGeneration(param,{'name':'LARGE','tube':'large','material':'','resonator':''})
SMALL = TaskAcquisitionGeneration(param,{'name':'LARGE','tube':'small','material':'','resonator':''})

In [None]:
# LARGE.open_from_json('LARGE_30s_LOWDENS')
SMALL.open_from_json('06_12_30s_small_lowdens_2')
LARGE.open_from_json('06_12_30s_large_lowdens')


In [None]:
LARGE.param.df,SMALL.param.df

In [None]:
spectro_average(LARGE,5)
spectro_average(SMALL,5)
df = LARGE.av_f[1]
nb_point = int((1600-500)/df)

# full scale frequencies
frequencies = LARGE.av_f
# crop scale frequencies

signal_large = LARGE.av_fft[0]
signal_small = SMALL.av_fft[1]

# Gaussian window to smooth the transitions
w = windows.get_window(('gaussian',nb_point*2/6),nb_point*2)
weigth = w[int(len(w)/2):]

# select the right range for each signal
frequencies_large = (frequencies>=50) & (frequencies <= 1600)
frequencies_small = (frequencies>=500) & (frequencies <= 6400)

signal_merged = [[],[]]

merge = (signal_large[int(50/df):int(1600/df)][-int(nb_point):]* weigth  + signal_small[int(500/df):int(6401/df)][:int(nb_point)]* weigth[::-1] )/(weigth[::-1]+weigth)
signal_merged = np.hstack([np.ones(int(50/df))*np.NAN,signal_large[frequencies_large][:-int(nb_point/df)-1], merge, signal_small[frequencies>=500][int(nb_point/df):]])

    
# signal_merged = [[],[]]

# for i in [0,1]:
#     seg_merged = []
#     for j in range(np.shape(signal_small.seg[i])[0]):
#         seg_large = signal_large.seg[i][j,:]
#         seg_small = signal_small.seg[i][j,:]
#         seg_merged += [combine_seg(seg_large,seg_small)]
#     signal_merged[i] = np.array(seg_merged)

# if H12 :
#     H12_merged = combine_seg(signal_large.H_ratio,signal_small.H_ratio)
#     return H12_merged


In [None]:
np.shape(signal_large[int(50/df):int(1600/df)][-int(nb_point):]),np.shape(signal_small[int(500/df):int(6401/df)][:int(nb_point)])

In [None]:
merged_metric = combine_metric(LARGE,SMALL,'R')
merged_metric = average_smoothing(abs(merged_metric),20)
fig = go.Figure()
fig.add_trace(go.Scatter(y=abs(merged_metric)))

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter( y = abs(LARGE.R)))
fig.add_trace(go.Scatter(y = abs(SMALL.R)))

# quasi meme graph

In [None]:
plot_av_fft_m([LARGE,SMALL])

In [None]:
def spectro_average_test(signal,df):
     
    fe = signal.sample_rate
    # df = 1/t = fe/N  =>N = fe/df
    N = int(fe/df)
    overlap = int(N//6)
    signal.av_fft = [[],[]]
    signal.seg = [[],[]]

    for i in  [0,1]:
        f_ov, t_ov,Sxx_ov = spectrogram(np.array(signal.acquired_samples[i]),fe,mode='complex',scaling='spectrum',window='hann',nperseg=N,noverlap=overlap)
        Sxx_ov = Sxx_ov*2
        # print('Sxx_ov', np.shape(Sxx_ov)) # Sxx_ov (7501, 5)
        signal.seg[i] = np.transpose(Sxx_ov) # COMPLEXE
        # print('Sxx_ov_trans après transpoer', np.shape(signal.seg[i])) # Sxx_ov_trans après transpoer (5, 7501)
        # print('Sxx_ov apres transpose', np.shape(Sxx_ov)) # Sxx_ov apres transpose (7501, 5)

        Sxx_ov_av = np.mean(Sxx_ov,axis=1)
        # print('mean',np.shape(Sxx_ov_av)) #mean (7501,)
        signal.av_fft[i] = Sxx_ov_av
    
    signal.av_f = f_ov

In [None]:
fig_av_fft = go.Figure()

sig = SMALL
spectro_average_test(sig,5)
fig_av_fft.add_trace(go.Scatter(y=abs(sig.av_fft[0])))
fig_av_fft.add_trace(go.Scatter(y=abs(sig.av_fft[1])))


f = sig.av_f
mic1 = sig.seg[0][:,:]
mic2 = sig.seg[1][:,:]
fig_av_fft.add_trace(go.Scatter(y=abs(mic1[0])))
fig_av_fft.add_trace(go.Scatter(y=abs(mic1[1])))
fig_av_fft


In [None]:
S = lambda i,j: sig.seg[i][:,:].conjugate()*sig.seg[j][:,:]
sig.coh = np.nanmean( (S(0,1)*S(1,0))/(S(0,0)*S(1,1)), axis = 0)

In [None]:
fig_coh = go.Figure()
fig_coh.add_trace(go.Scatter(y=np.round(abs(sig.coh),5)))

In [None]:
try:
    H_ratio = sig.H_ratio
except:
    print('No H ratio')
    H_ratio = 1
sig.k =  2*np.pi*f/sig.c0
win = 5
H_ratio = 1

# H_ratio = np.hstack([H_ratio[:win], [np.mean(H_ratio[i*win:(i+1)*win]) for i in range(int(len(H_ratio)/5)-win+1)]])
expiksn = np.repeat([ np.exp(-1j*sig.k*sig.s)],np.shape(sig.seg[0][:,:])[0],axis=0)
expiksp = np.repeat([ np.exp(1j*sig.k*sig.s)],np.shape(sig.seg[0][:,:])[0],axis=0)
R0 = (H_ratio*(mic2/mic1) - expiksn)/(expiksp - H_ratio*(mic2/mic1))
sig.R = np.mean(np.array([np.exp(1j*2*sig.k*0.15)] * R0 ),axis=0) #COMPLEX

In [None]:
np.shape(H_ratio)

In [None]:
np.max(abs(mic2/mic1))

In [None]:
# fig_ratio = go.Figure()
# fig_ratio.add_trace(go.Scatter(y = abs(np.mean(H_ratio*(mic2/mic1) - expiksn,axis=0))))
# fig_ratio.add_trace(go.Scatter(y = abs(np.mean((expiksp - H_ratio*(mic2/mic1)),axis=0))))


In [None]:
sig.s,sig.c0

In [None]:
print('expiksn', np.shape(expiksn))
print('expiksp', np.shape(expiksp))
print('H_ratio*H-exp',np.shape(H_ratio*(mic2/mic1)- expiksn))
print('R0',np.shape(R0))
print('R',np.shape(sig.R),type(sig.R[0]))

In [None]:
combine_metric

In [None]:
fig_R = go.Figure()
fig_R.add_trace(go.Scatter(y=abs(sig.R)))