In [None]:
import os
current_folder = os.path.dirname(os.path.abspath("__file__")) # Gets the location of the present script
parent_folder = os.path.dirname(current_folder) # Gets the path for the origin of the repository, as it is locally stored
utils_folder = os.path.join(parent_folder, 'utilities') # Creates a view to the utilites the folder
os.chdir(utils_folder) # Makes it the active folder
from metro_utils import * # Imports everything from metro_utils
from rf_utils import * # Imports everything from rf_utils
from LabberReader import * # Uncomment if you have access to the Labber API
os.chdir(current_folder) # Makes the folder of the script the active one

In [3]:
import xarray as xr #estensione di pandas per lavorare con array multidimensionali
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
from scipy.signal import savgol_filter, find_peaks, argrelextrema #strumenti per signal processing: Applies a Savitzky-Golay filter to an array, Finds peaks inside a signal based on peak properties, Calculate the relative extrema of data
from scipy.optimize import curve_fit #Use non-linear least squares to fit a function, f, to data
from scipy.stats import linregress #Calculate a linear least-squares regression for two sets of measurements
from typing import Optional #Typing: This module provides runtime support for type hints, Optional: Use Optional to indicate that an object is either one given type or None
from itertools import product #provides various functions for creating and manipulating iterators, product: computes the cartesian product of input iterables
import matplotlib.patches as patches # Patches are arbitrary two dimensional regions

In [None]:
def dB(x, n=2):
    return n * 10 * np.log10(x)

#definisco l'"inversa" di db
def linear(x, n=1):
    return 10**(x / (n * 10))

#Definisco l'equazione per una parabola
def parabula(x, a, b, c):
    x = np.array(x)
    return a*x**2 + b*x +c

#Definisco la combinazione lineare di due gaussiane di media x1, x2 e std s1, s2
def two_gaussians(x, A1, A2, x1, x2, s1, s2):
    return A1 * np.exp(-((x-x1)/s1)**2) + A2 * np.exp(-((x-x2)/s2)**2)

def padded_moving_average(data, window=11):
    #add data outside range to perform moving average on the first and last points
    padded_data = np.pad(data, (window//2, window//2), mode='reflect')

    #returns the smoothed function
    return np.convolve(padded_data, np.ones(window)/window, mode="valid")

In [5]:
def fft_on_diff(profile: xr.core.dataarray.DataArray, 
                average: np.ndarray):
    '''
    Function that takes the difference between the original profile and the smoothed one with moving average as input,
    computes its fft and returns the fft maximum and its correspondent time in order to choose the best window.
    '''
    #Calcoliamo la differenza
    diff_profile = profile-average

    #calcoliamo la trasformata
    fft = np.fft.fft(diff_profile)
    magnitude = np.abs(fft)

    #dare un range per le x
    frequencies = np.linspace(0.4e10, 1.2e10, len(diff_profile))
    time = frequencies
    dt = np.mean(np.diff(time))

    fft_frequencies = np.fft.fftfreq(len(diff_profile), d=dt)

    #plots
    fig, ax = plt.subplots(2, 1, figsize=[10, 5], dpi=200)

    #difference plot
    ax[0].plot(time, diff_profile, color="red", label="Profile minus average")
    ax[0].set_xlabel("Frequency (Hz)") 
    ax[0].set_ylabel("|S21| / dB")
    ax[0].legend()
    ax[0].grid()

    #fft plot
    ax[1].plot(fft_frequencies[fft_frequencies >= 0], 
            magnitude[fft_frequencies >= 0], 
            color='blue', label="FFT")
    ax[1].set_xlabel("Time (s)")
    ax[1].set_ylabel("Magnitude")
    ax[1].legend()
    ax[1].grid()

    fig.tight_layout()
    plt.show()

    #Find maximum
    max_fft = np.max(magnitude[fft_frequencies >= 0])
    max_time = fft_frequencies[fft_frequencies > 0][np.argmax(magnitude[fft_frequencies >= 0])]
    ind_max = np.argmax(magnitude[fft_frequencies >= 0])

    print(f'Massimo\np0: {max_fft}\nTime: {max_time} s\nIndice del massimo: {ind_max}')

    prop = max_fft/max_time

    print(f"Prodotto = {prop}\nFreq = {1/max_time} Hz\n")

    return max_fft, 1/max_time 

In [1]:
#function that returns the optimal window width to perform the moving average

#Funziona con argo e crescendo, verificare su eventuali altri dispositivi
def best_window(p0, frequency, alpha=0.015, step=16e6):
    if int(p0*frequency*alpha/step)%2 == 1 and int(p0*frequency*alpha/step) <= 13:
        return int(p0*frequency*alpha/step)
    elif int(p0*frequency*alpha/step)%2 == 0 and int(p0*frequency*alpha/step)-1 <= 13:
        return int(p0*frequency*alpha/step)-1
    else:
        return 13

In [6]:
def find_G_BW(
        x: np.ndarray, # The frequency array
        y: np.ndarray, # The trace
        phase_matching_scheme: {'Dispersion engineering', 'Reversed Kerr'}, #dispersion engineering è il profilo "bucato"
        f_p  = None,
        w: float = 0.1e9, # The window width (in Hz) for Svaitsky Golay filter
        min_prominence=0.1, # Relative minimal prominace of the local minima to be used in the algorithm for locating the stopband
        cut_off=None, # An upper cut-off on frequency to avoid pathological behaviour
        ):
    y = 20*np.log10(np.abs(y))
    if cut_off is not None:
        x, y = x[x<=cut_off], y[x<=cut_off]
    dx = x[1]-x[0]
    # limiting the examined region at the actual amplification band
    try: #L'istruzione try except mi permette di intercettare uno o più errori nell'esecuzione di un blocco di istruzioni tramite la gestione delle eccezioni
        edges = x[y>=0][0], x[y>=0][-1] #considero gli elementi di x in cui y è non negativa e prendo i bordi, dove la traccia è y = 20*np.log10(np.abs(y))
    except IndexError:
        return 0
    amp_x, amp_y = x[x >= edges[0]], y[x >= edges[0]]
    amp_x, amp_y = amp_x[amp_x <= edges[-1]], amp_y[amp_x <= edges[-1]] 

    if phase_matching_scheme == 'Dispersion engineering':

        # minima whose prominence is at least 10 % of the whole range of transmission values
        min_idxs = find_peaks(-amp_y, prominence=min_prominence*(np.max(amp_y)-np.min(amp_y)))[0]

        # the minima laying in region of positive concavity are selected
        try:
            y_der2 = savgol_filter(amp_y, window_length=int(w / dx), polyorder=2, deriv=2)
        except ValueError:
            return 0
        selected_min_idxs = []
        for min_idx in min_idxs:
            if np.interp(amp_x[min_idx], amp_x, y_der2) > 0:
                selected_min_idxs.append(min_idx)

        # the points of change of concavity sign that enclose the selected minima
        zeros_indices = np.where(np.diff(y_der2 > 0))[0]
        i1 = np.max(zeros_indices[zeros_indices <= np.min(selected_min_idxs)])
        i2 = np.min(zeros_indices[zeros_indices >= np.max(selected_min_idxs)])

        # The fit will be permormed on a new set of points the bandgap has been excluded from
        x_fit, y_fit = np.array([*amp_x[:i1], *amp_x[i2:]]), np.array([*amp_y[:i1], *amp_y[i2:]]) # * usato per passare un numero variabile di argomenti

        # I fit the two regions with lines, to find their intersection and use it as a guess on the parabula's vertex
        r1 = linregress(amp_x[:i1], amp_y[:i1])
        r2 = linregress(amp_x[i2:], amp_y[i2:])
        x0 = (r2.intercept - r1.intercept)/(r1.slope-r2.slope)
        y0 = r2.slope * x0 + r2.intercept

        a = (amp_y[0]-y0)/(amp_x[0]-x0)**2 
        b = 2*a*x0
        c = y0 + a*x0**2

        popt, _ = curve_fit(
            parabula, 
            x_fit, 
            y_fit, 
            p0=(a,b,c),
            )
        
        #popt sarà un array. Optimal values for the parameters so that the sum of the squared residuals of f(xdata, *popt) - ydata is minimized.
        #_ 

        # The two gains are the maximum values of the parabula in the two regions
        G_LRB = np.max(parabula(amp_x[:i1], *popt))
        G_URB = np.max(parabula(amp_x[i2:], *popt))

        # The frequency edges of the usable read-out bands are those that keep the transmission above half the quoted gain
        f1_LRB = np.max(amp_x[:i1][parabula(amp_x[:i1], *popt) >= G_LRB - 3])
        f2_LRB = np.min(amp_x[:i1][parabula(amp_x[:i1], *popt) >= G_LRB - 3])
        BW_LRB = np.abs(f2_LRB - f1_LRB)

        f1_URB = np.max(amp_x[i2:][parabula(amp_x[i2:], *popt) >= G_URB - 3])
        f2_URB = np.min(amp_x[i2:][parabula(amp_x[i2:], *popt) >= G_URB - 3])
        BW_URB = np.abs(f2_URB - f1_URB)

    else:
        try:
            popt, _ = curve_fit(
                two_gaussians, 
                amp_x, 
                amp_y, 
                # bounds=([0,30],[0,30], [4e9, f_p], [f_p, 12e9], [0, 8e9], [0,8e9]),
                bounds = ([0,0,4e9,f_p,0,0], [30,30,f_p,12e9, 8e9, 8e9])
                )
        except RuntimeError:
            return -100
        
        G_LRB, G_URB = popt[0], popt[1]
        one_band = False
        try:
            i_split = argrelextrema(amp_y, np.less)[0][0] #np.less: Return the truth value of (x1 < x2) element-wise
        except IndexError:
            i_split = len(amp_x)-1
            one_band = True

        
        # The frequency edges of the usable read-out bands are those that keep the transmission above half the quoted gain
        f1_LRB = np.max(amp_x[:i_split][two_gaussians(amp_x[:i_split], *popt) >= G_LRB - 3])
        f2_LRB = np.min(amp_x[:i_split][two_gaussians(amp_x[:i_split], *popt) >= G_LRB - 3])
        BW_LRB = np.abs(f2_LRB - f1_LRB)

        if not one_band:
            f1_URB = np.max(amp_x[i_split:][two_gaussians(amp_x[i_split:], *popt) >= G_URB - 3])
            f2_URB = np.min(amp_x[i_split:][two_gaussians(amp_x[i_split:], *popt) >= G_URB - 3])
            BW_URB = np.abs(f2_URB - f1_URB)
        else:
            BW_URB = 0

    metric = (linear(G_LRB)*BW_LRB + linear(G_URB)*BW_URB)/1e9
    
    return metric

In [7]:
def GainMap(filename: str, 
            phase_matching_scheme: {'Dispersion engineering', 'Reversed Kerr'}, 
            w: float = 0.4e9, 
            min_prominence: float =0.2, 
            cut_off: Optional[float] = 10e9, 
            top_k: int = 5):
    ds = xr.open_dataset(filename, engine='h5netcdf')
    x = ds['Frequency'].data
    y = ds['S21']
    figure_of_merit = xr.zeros_like(y.real.mean('Frequency'))
    figure_of_merit.attrs = dict(long_name=r'$\text{GB}_\text{eff}$', unit='GHz')
    for I in product(*[range(size) for size in figure_of_merit.shape]):
        local_coords = {coord: figure_of_merit[coord][i] for coord, i in zip(figure_of_merit.coords.keys(), I)}
        my_y = y.loc[local_coords].data
        
        try:
            #metric = find_G_BW(x, my_y, w=w, phase_matching_scheme=phase_matching_scheme, f_p=local_coords.get('Pump frequency') if phase_matching_scheme == 'Reversed Kerr' else None, min_prominence=min_prominence, cut_off=cut_off)
            metric = np.mean(20*np.log10(np.abs(my_y)))
        except ValueError:
            metric = 0
            # print('hey')
        figure_of_merit.loc[{coord: figure_of_merit[coord][i] for coord, i in zip(figure_of_merit.coords.keys(), I)}] = metric

    topKperformances = np.sort(figure_of_merit.data, axis=None)[::-1][:top_k]
    topKidxs = np.array(np.unravel_index(np.argsort(figure_of_merit.data, axis=None)[::-1][:top_k], figure_of_merit.shape)).T
    
    topKparameters = []
    # print(topKidxs)
    
    for idxs in topKidxs:
        # print(idxs)
        topKparameters.append({})
        for coord, i in zip(figure_of_merit.coords.keys(), idxs):
            topKparameters[-1][str(coord)]=  float(figure_of_merit[coord][i])

    
    return topKperformances, topKparameters, figure_of_merit

In [8]:
def plot_everything(file_coarse, 
                    file_fine, 
                    device_name=None, 
                    top_k=2, 
                    eval_func=GainMap, 
                    **eval_func_kwargs):
    max1, best_pump_parameters, figure_of_merit = GainMap(file_coarse, top_k=1, **eval_func_kwargs)
    max2, best_pump_parameters2, figure_of_merit2 = GainMap(file_fine, top_k=top_k, **eval_func_kwargs)
    S21_fine = xr.open_dataset(file_fine, engine='h5netcdf')['S21'].rf.LogMag()
    fig, (ax1, ax2, ax3) = plt.subplots(ncols=3, figsize=[10, 3], dpi=200)

    figure_of_merit.metroplot(vmin=0, vmax=np.fmax(max1[0], max2[0]), cmap='SQE@INRiM', x=str(list(figure_of_merit[0].coords.keys())[0]), ax =ax1)
    # ax1.plot([list(best_pump_parameters.values())[0]], [list(best_pump_parameters.values())[1]], 'rx')
    xy = list(best_pump_parameters[0].values())[0]-35e6, list(best_pump_parameters[0].values())[1]-2.5
    rect = patches.Rectangle(xy, 70e6, 5, linewidth=1, edgecolor='r', facecolor='none', linestyle='--', clip_on=False, alpha=0.75, lw=.75)
    ax1.add_patch(rect)
    c = figure_of_merit2.metroplot(vmin=0, vmax=np.fmax(max1[0], max2[0]), cmap='SQE@INRiM', x=str(list(figure_of_merit[0].coords.keys())[0]), ax =ax2)
    x_coords = c.get_paths()[0].vertices[:, 0]
    y_coords = c.get_paths()[0].vertices[:, 1]
    pixel_width = np.diff(np.unique(x_coords)).mean()
    pixel_height = np.diff(np.unique(y_coords)).mean()
    cmap = mpl.colormaps['brg']
    colors = cmap(np.linspace(0.5, 1, top_k))
    gain_profile = []
    mov_avg = []

    for max, pump_pars, color in zip(max2, best_pump_parameters2, colors):
        xy2 = list(pump_pars.values())[0] - pixel_width/2, list(pump_pars.values())[1] - pixel_height/2
        rect2 = plt.Rectangle(
            xy2,
            pixel_width, 
            pixel_height, 
            edgecolor=color, 
            facecolor='none', 
            linewidth=.75,
            alpha=.75
            )
        ax2.add_patch(rect2)

        label = figure_of_merit2[0].attrs['long_name'] + f' = {max:.2f} GHz with '
        for key, val in pump_pars.items():
            label += f'{val:.5g} {figure_of_merit2[0][key].attrs["unit"]}, '
        # print(pump_pars)
        S21_fine.sel(pump_pars).metroplot(ax=ax3, color=color, label=label[:-2], lw=0.7) #Return a new DataArray whose data is given by selecting index labels along the specified dimension(s).
        #ax3.plot(S21_fine["Frequency"].data, padded_moving_average(S21_fine.sel(pump_pars)), label="MA") #sistemare label
        gain_profile.append(S21_fine.sel(pump_pars))
        mov_avg.append(padded_moving_average(S21_fine.sel(pump_pars)))
    
    ax3.legend(loc='lower center', bbox_to_anchor=(0.5, 1))
    ax3.grid()
    ax3.set_title('')
   
    if device_name is not None:
        ax2.set_title(device_name)
    fig.tight_layout()

    plt.subplots(figsize=[10, 3], dpi=200)
    for i, color in zip(range(len(mov_avg)), colors):
        plt.plot(S21_fine["Frequency"].data, mov_avg[i], color=color, label=f'MA profilo {i+1}')
    plt.grid()
    plt.legend()
    plt.xlabel("Frequency / Hz")
    plt.ylabel("|S21| / dB")
    plt.tight_layout()
    plt.title("Moving Average for each Profile")
    
    for j in range(len(best_pump_parameters2)):
        print(f'List of parameters {j+1}: {best_pump_parameters2[j]}')
    return best_pump_parameters2, gain_profile, mov_avg

In [9]:
pump_pars, g_prof, m_avg = plot_everything(
    r"Data/ARGO SW2311019A - Coarse gain tune up.h5", 
    r"Data/ARGO SW2311019A - Fine gain tune up.h5", 
    "ARGO SW2311019A", 
    phase_matching_scheme='Dispersion engineering', 
    top_k=5, w=0.4e9, min_prominence=0.3, cut_off=1e9)

FileNotFoundError: [Errno 2] Unable to open file (unable to open file: name = 'c:\Users\Mattia\OneDrive\Documenti\GitHub\Data_Analysis\Data\ARGO SW2311019A - Coarse gain tune up.h5', errno = 2, error message = 'No such file or directory', flags = 0, o_flags = 0)

In [None]:
S21_trial = xr.open_dataset(r"Data/ARGO SW2311019A - Fine gain tune up.h5", engine='h5netcdf')['S21'].rf.LogMag()

In [None]:
massimo, frequenza = fft_on_diff(S21_trial.sel(pump_pars[0]), padded_moving_average(S21_trial.sel(pump_pars[0])))

In [None]:
w = best_window(massimo, frequenza)
print(w)

In [None]:
#double smoothing

plt.subplots(figsize=[10, 3], dpi=200)
#plt.plot(S21_trial["Frequency"].data, S21_trial.sel(pump_pars[0]), color="violet", label="Original")
#plt.plot(S21_trial["Frequency"].data, padded_moving_average(S21_trial.sel(pump_pars[4]), w), color="red", label="Single smoothing")
plt.plot(S21_trial["Frequency"].data, padded_moving_average(padded_moving_average(S21_trial.sel(pump_pars[4]), w), w), color="green", label="Double smoothing")
plt.grid()
plt.legend()
plt.xlabel("Frequency / HZ")
plt.ylabel("|S21| / dB")
plt.show()



---

In [None]:
#proviamo a fare queste operazioni con crescendo

pump_pars_c, g_prof_c, m_avg_c = plot_everything(
    r"Data/CRESCENDO V14W15F1- Coarse gain tune up.h5", 
    r"Data/CRESCENDO V14W15F1- Fine gain tune up.h5", 
    "CRESCENDO V14W15F1", 
    phase_matching_scheme='Dispersion engineering', 
    top_k=5, w=0.4e9, min_prominence=0.3, cut_off=1e9)

In [None]:
crescendo = xr.open_dataset(r"Data/CRESCENDO V14W15F1- Fine gain tune up.h5", engine="h5netcdf")['S21'].rf.LogMag()

In [None]:
massimo_c, frequenza_c = fft_on_diff(crescendo.sel(pump_pars_c[4]), padded_moving_average(crescendo.sel(pump_pars_c[4])))

In [None]:
w_c = best_window(massimo_c, frequenza_c)
print(w_c)

In [None]:
#Conclusions
#La finestra più adatta per approssimare i profili sembra essere window_opt = 11
#Con questa scelta, la costante di proporzionalità alpha si può prendere circa pari a 1/50 (0.02)

#quindi la formula finale per la scelta della finestra è data da:
#window_opt = alpha * massimo * frequenza / 16e6

In [None]:
#eventuali aggiunte per smussare ancora il profilo

In [None]:
#double smoothing

plt.subplots(figsize=[10, 3], dpi=200)
#plt.plot(crescendo["Frequency"].data, crescendo.sel(pump_pars_c[3]), color="violet", label="Original")
#plt.plot(crescendo["Frequency"].data, padded_moving_average(crescendo.sel(pump_pars_c[3]), w_c), color="red", label="Single smoothing")
plt.plot(crescendo["Frequency"].data, padded_moving_average(padded_moving_average(crescendo.sel(pump_pars_c[4]), w_c), w_c), color="green", label="Double smoothing")
plt.grid()
plt.legend()
plt.xlabel("Frequency / HZ")
plt.ylabel("|S21| / dB")
plt.show()