In [None]:
import tensorflow as tf
import numpy as np
from tensorflow import keras
from keras import models, Model, layers
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Lambda
from keras import backend as K
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.preprocessing import MinMaxScaler, StandardScaler, minmax_scale
from sklearn.decomposition import PCA
import math
from scipy.interpolate import griddata
from scipy.integrate import cumulative_trapezoid
import seaborn as sns
from matplotlib import cm
import pandas as pd
import pickle

In [None]:
'''define parameters here'''
chain_lenght = 21               #spin chain lenght
nodes = [500,200,100]           #network nodes
epochs = 600                    #epochs
batch_size = 100                #batch
outputlayer_shape = 4           #number of parameters to predict
num_of_dcs = 3                  #number of different dcs
num_of_Es = 266                 #number of energies in dI/dV curve
es_dI2d2V = np.linspace(-0.5,4,300)
es_dIdV = np.linspace(0,4,266)
include_gamma = False           #include gamma to the training
d_ind = (1, 0.05)               #dropout layer index and %, no dropout = (0,0)
loss = 'mean_squared_error'     #loss function                                     
n_params = 6                    #number of parameters
SEED = 3                        #random seed for tensorflow
noise_seed = 1                  #random seed for noisy data
include_all = False             #training with all datasets/only single one, True/False
PCA_noise = 3                   #amount of gaussian noise in PCA fitting
RELU = True                     #relu activation function
noise_scaler = 0.01             #noise scaler for noisy copies
noise_levels = [level for level in range(11)] # #noise levels for noisy copies in training set

tf.config.experimental.enable_op_determinism()        #deterministic in order to repeat results
tf.random.set_seed(SEED)                              #SEED for tensorflow   

network = f'{noise_seed}_{SEED}'                      #network name with params
name = f'SingleImp'                                   #name of the network for saving files


In [None]:
def plot_heat(chain_lenght: int, dcs: np.array, es: np.linspace, num_of_dcs: int, num_of_Es : int, sample:  int, title: str, norm_max: float, method: str, directions: list):
    '''Function to plot heatmaps of dynamical correlators.
    Args:
        chain_lenght (int): Length of the spin chain.
        dcs (np.array): Array of dynamical correlators.
        es (np.linspace): Energy values.
        num_of_dcs (int): Number of dynamical correlators in the dataset (1, 2, or 3).
        num_of_Es (int): Number of energy values.
        sample (int): Sample index.
        title (str): Title for the plot.
        norm_max (float): Maximum normalization value for the plot.
        method (str): Interpolation method for the griddata function.
        directions (list): List of directions for the dynamical correlators.
    Returns:
        None: The function saves the plot as a PNG file and displays it.

    This function performs the following steps:
    1. Takes the chain length, dynamical correlators, energy values, number of dynamical correlators, number of energies, sample index, 
       title for the plot, maximum normalization value, interpolation method, and directions.
    2. Loops over each dynamical correlator direction.
    3. For each direction, it creates a file "XYZ.OUT" to store the data.
    4. Loops over each site in the chain and for each energy value, it writes the site number, energy value, 
       and corresponding dynamical correlator value to the file.'''

    '''loop over dynamical correlators'''
    custom_cmap = cm.get_cmap('inferno')  


    for dc in range(num_of_dcs):
        f = open("XYZ.OUT","w")

        '''loop over sites'''
        for s in range(chain_lenght): 
            for idx, freq in enumerate(es):
                D = dcs[sample,(num_of_dcs*s+dc)*num_of_Es:(num_of_dcs*s+dc)*num_of_Es+num_of_Es]
                
                f.write(str(s)+"  ")
                f.write(str(freq)+"  ")
                f.write(str(D[idx])+"\n")

        f.close()

        '''create mesh'''
        x = np.linspace(0,chain_lenght-1,num_of_Es)
        y =  es
        x_mesh, y_mesh = np.meshgrid(x,y)

        d = np.genfromtxt("XYZ.OUT").T
        norm = plt.Normalize(0,norm_max)

        mesh_plot = plt.figure()
        z_mesh = griddata((d[0], d[1]), d[2], (x_mesh, y_mesh), method=method)
        plt.contourf(x_mesh, y_mesh, z_mesh, 100, norm=norm, cmap=custom_cmap)
        plt.tick_params(axis='both', which='major', labelsize=16)
        
        c = plt.colorbar()
        c.ax.tick_params(labelsize=16)


        plt.title(f"{title} {directions[dc]}", fontsize=18)
        plt.xlabel('Site number', fontsize=16)
        plt.ylabel('Energy [E]', fontsize=16)
        plt.savefig(f'spinchain{chain_lenght}_mesh_plot_{dc}_{sample}_{directions[dc]}_.png', dpi=300)
        plt.show()

def extend_with_reversed_subsets(arr, num_of_energies):
    ''''Function to extend dynamical correlators for the whole symmetric chain.
    Arguments:
        arr (np.array): Input array with shape (n_samples, n_features). Input array has dynamical correlators for one half of the chain in one spatial direction.
        num_of_energies (int): Size of the subsets to create. Grid point number in the dynamical correlators.
    Returns:
        np.array: Extended array with shape (n_samples, new_features). Output array has dynamical correlators for the whole chain.

    This function performs the following steps:
    1. Takes an array of dynamical correlators for one half of the chain.
    2. Creates subsets of the array based on the number of energies.
    3. Reverses the subsets and concatenates them to create an extended array for the whole chain.
    4. Returns the extended array with dynamical correlators for the whole chain.
    5. The output array has dynamical correlators for the whole chain, first x and then z direction for each spin.

    Args:
        arr (np.array): Input array with shape (n_samples, n_features). Input array has dynamical correlators for one half of the chain in one spatial direction.
        num_of_energies (int): Size of the subsets to create. Grid point number in the dynamical correlators.
    Returns:
        np.array: Extended array with shape (n_samples, new_features). Output array has dynamical correlators for the whole chain'''
    
    num_cols = arr.shape[1]                
    num_spins = num_cols // num_of_energies   
    subsets = [arr[:, i*num_of_energies:(i+1)*num_of_energies] for i in range(num_spins)]     #create subset 
    reversed_subsets = subsets[:-1][::-1]                                                     #exclude last subset and reverse
    extended_dIdV = np.hstack(subsets + reversed_subsets)                                     #concatenate data

    return extended_dIdV

def include_dc(dcs_all, dc_dir:int, chain_length, n_d, n_E):
    '''Function to include a specific dynamical correlator direction in the dataset.
    Args:
        dcs_all (np.array): Array of dynamical correlators with shape (n_samples, n_d * n_E * chain_length).
        dc_dir (int): Index of the dynamical correlator direction to include.
        chain_length (int): Length of the spin chain.
        n_d (int): Number of dynamical correlators.
        n_E (int): Number of energy values.
    Returns:
        np.array: Array with the specified dynamical correlator direction for all chains.
        
    This function performs the following steps:
    1. Takes an array of dynamical correlators, direction index, chain length, number of dynamical correlators, and number of energies.
    2. Initializes an empty array to hold the selected dynamical correlator direction for all chains.
    3. Iterates through each chain and extracts the specified dynamical correlator direction.
    4. Concatenates the extracted dynamical correlator direction for all chains into a single array.
    5. Returns the concatenated array with the specified dynamical correlator direction for all chains.'''
    
    dc_dir_ALL = np.zeros((dcs_all.shape[0], n_E * chain_length))

    for s in range(chain_length):
        start_idx = s * n_d * n_E + dc_dir * n_E
        end_idx = start_idx + n_E
        dc = dcs_all[:, start_idx:end_idx]
        dc_dir_ALL[:, s * n_E:(s + 1) * n_E] = dc

    return dc_dir_ALL

def compute_cumulative_integral(arr, sites, n_dcs, n_E_final, final_linspace, group_size=300):
    '''Function to compute the cumulative integral of dI2/d2V for each sample in the dataset.
    Args:
        arr (np.array): Array of dI2/d2V values with shape (n_samples, n_features).
        sites (int): Number of sites in the spin chain.
        n_dcs (int): Number of dynamical correlators.
        n_E_final (int): Number of final energy values.
        final_linspace (np.linspace): Final linspace for the energy values.
    group_size (int): Size of the groups to split the array into. Default is 300.

    Returns:
        np.array: Array with cumulative integrals for each sample with shape (n_samples, n_E_final * sites * n_dcs).
        
    This function performs the following steps:
    1. Takes an array of dI2/d2V values, number of sites, number of dynamical correlators, number of final energies, and the final linspace.
    2. Splits the array into chunks based on the group size.
    3. Computes the cumulative integral for each chunk using the trapezoidal rule.
    4. Concatenates the results to create a final array with the cumulative integral for each sample.
    5. Returns the final array with cumulative integrals for each sample.
    '''
    dc_dir_ALL = np.zeros((arr.shape[0],n_E_final*sites*n_dcs))
    
    for sample in range(arr.shape[0]):
        chunks = [arr[sample][i:i + group_size] for i in range(0, arr.shape[1], group_size)]
        result = [chunk[(group_size-n_E_final):] for chunk in chunks if len(chunk) >= group_size]
        
        def compute_sum(dI2_d2V, es):
            return cumulative_trapezoid(dI2_d2V, es, initial=0)
        
        dI_dVs = compute_sum(result[:], final_linspace)
        dI_dV  = np.array(np.concatenate(dI_dVs))
        dc_dir_ALL[sample] = dI_dV
        
    return dc_dir_ALL


In [None]:
'''download data and split train and test'''
if include_all:
    data10 = np.genfromtxt(f'dIdV_impurity_at_10.txt')
    data911 = np.genfromtxt(f'dIdV_impurity_at_9and11.txt')
    data812 = np.genfromtxt(f'dIdV_impurity_at_8and12.txt')

    params = data812[:, len(data812[0])-n_params:]      #parameters are the same for all datasets, take from one

    dcs_x_10  = data10[:, :2926]                         #dynamical correlators for x direction
    dcs_z_10  = data10[:, 2926:-6]                       #dynamical correlators for z direction
    dcs_x_911 = data911[:, :2926]
    dcs_z_911 = data911[:, 2926:-6]
    dcs_x_812 = data812[:, :2926]
    dcs_z_812 = data812[:, 2926:-6]

    dcs_x_10 = extend_with_reversed_subsets(dcs_x_10, 266)      #extend to whole chain
    dcs_z_10 = extend_with_reversed_subsets(dcs_z_10, 266)
    dcs_x_911 = extend_with_reversed_subsets(dcs_x_911, 266)
    dcs_z_911 = extend_with_reversed_subsets(dcs_z_911, 266)
    dcs_x_812 = extend_with_reversed_subsets(dcs_x_812, 266)
    dcs_z_812 = extend_with_reversed_subsets(dcs_z_812, 266)

    dcs_x = np.concatenate((dcs_x_10, dcs_x_911, dcs_x_812), axis=1)
    dcs_z = np.concatenate((dcs_z_10, dcs_z_911, dcs_z_812), axis=1)

    dIdV_and_params = np.concatenate((dcs_x, dcs_z, params), axis=1)

    print(dIdV_and_params.shape)
    train_data                  = dIdV_and_params[:dIdV_and_params.shape[0]-dIdV_and_params.shape[0]//3, :]
    test_data                   = dIdV_and_params[dIdV_and_params.shape[0]-dIdV_and_params.shape[0]//3:, :]



else: 
    dIdV_and_params = np.genfromtxt(f'dIdV_impurity_at_10.txt')

    params = dIdV_and_params[:, len(dIdV_and_params[0])-n_params:]
    dcs_x_10 = dIdV_and_params[:, :2926]
    dcs_z_10 = dIdV_and_params[:, 2926:-6]

    dcs_x_10 = extend_with_reversed_subsets(dcs_x_10, 266)
    dcs_z_10 = extend_with_reversed_subsets(dcs_z_10, 266)

    dIdV_and_params = np.concatenate((dcs_x_10, dcs_z_10, params), axis=1)

    train_data                  = dIdV_and_params[:dIdV_and_params.shape[0]-dIdV_and_params.shape[0]//3, :]
    test_data                   = dIdV_and_params[dIdV_and_params.shape[0]-dIdV_and_params.shape[0]//3:, :]


    


In [None]:
def gaussian_noise(x, mu, std, seed):
    '''Function to add Gaussian noise to the input data.
    Args:
        x (np.array): Input data array with shape (n_samples, n_features).
        mu (float): Mean of the Gaussian noise.
        std (float): Standard deviation of the Gaussian noise.
        seed (int): Random seed for reproducibility.
    Returns:
        np.array: Noisy data array with the same shape as the input data.'''

    if seed != None:
        np.random.seed(seed)
    noise = np.random.normal(mu, std, size = x.shape)
    x_noisy = x + noise
    return x_noisy

def noisy_copies(raw_data, n_copies, n_variations, n_params, n_levels, seed):
    '''Function to create noisy copies of the raw data.
    Args:
        raw_data (np.array): Original data array with shape (n_samples, n_features).
        n_copies (int): Number of copies to create.
        n_variations (int): Number of variations for each copy.
        n_params (int): Number of parameters in the dataset.
        n_levels (list): List of noise levels to apply.
        seed (int): Random seed for reproducibility.
    Returns:
        np.array: Noisy data array with shape (n_copies * n_variations, n_features).

    This function performs the following steps:
    1. Takes the raw data, number of copies, number of variations, number of parameters, noise levels, and seed.
    2. Creates a noisy version of the raw data by tiling it according to the number of copies and variations.
    3. Extracts the dynamical correlators and parameters from the noisy data.
    4. Applies Gaussian noise to the dynamical correlators based on the specified noise levels.
    5. Returns the modified dynamical correlators and parameters as separate arrays.
    '''

    noisy_data           = np.tile(raw_data, (n_copies*n_variations,1))
    copied_dcs           = np.array(noisy_data[:,0:len(noisy_data[0])-n_params])
    
    copied_dcs[copied_dcs < 0] = 0
    params               = noisy_data[:,len(noisy_data[0])-n_params:]
    
    for level in n_levels:
        if level == 0:
            continue
        
        s_ind = level*copied_dcs.shape[0]//n_copies
        e_ind = (level+1)*copied_dcs.shape[0]//n_copies
        
        copied_dcs[s_ind:e_ind,:] = gaussian_noise(np.copy(copied_dcs[s_ind:e_ind,:]), mu=0.0, std=level * noise_scaler * np.std(copied_dcs), seed = seed)

    return copied_dcs, params


In [None]:
'''generate noisy training set'''
dcs_train, params_train     = noisy_copies(train_data, 11, 1, 6, noise_levels, seed=noise_seed)

dcs_test                    = test_data[:,0:-n_params]
params_test                 = test_data[:,-n_params:]

dcs_train[dcs_train < 0]    = 0
dcs_test[dcs_test < 0]      = 0

dcs_train                   = np.array(dcs_train)
dcs_test                    = np.array(dcs_test)

J1_train      = params_train[:,0]
J2_train      = params_train[:,1] 
JZ_train      = params_train[:,2]
JD_train      = params_train[:,3]
J3_train      = params_train[:,4]
gamma_train   = params_train[:,5]

J1_test      = params_test[:,0]
J2_test      = params_test[:,1] 
JZ_test      = params_test[:,2]
JD_test      = params_test[:,3]
J3_test      = params_test[:,4]
gamma_test   = params_test[:,5]


'''fit PCA on noisy (chi=0.03) data and apply it to train and test sets'''
pca = PCA(0.99)
dcs_for_PCA = dcs_train[PCA_noise*280:(PCA_noise+1)*280,:]
pca.fit(dcs_for_PCA)

dcs_train_arr               = pca.transform(dcs_train)
dcs_test_arr                = pca.transform(dcs_test)

dcs_PCA_train   = np.copy(dcs_train_arr)
dcs_PCA_test    = np.copy(dcs_test_arr)
pca_components  = dcs_PCA_train.shape[1]

with open(f'pca_{name}.pkl', 'wb') as pickle_file: 
    pickle.dump(pca, pickle_file)
print(f"number of PCA components used: {pca_components}")


In [None]:
'''define scalers'''
scaler_J2   = MinMaxScaler(feature_range=(0,1)) 
scaler_JZ  = MinMaxScaler(feature_range=(0,1))
scaler_J3 = MinMaxScaler(feature_range=(0,1))
scaler_JD   = MinMaxScaler(feature_range=(0,1))

'''scale train params'''
J2_train          = np.array(J2_train).reshape(-1,1)
JZ_train          = np.array(JZ_train).reshape(-1,1)
JD_train          = np.array(JD_train).reshape(-1,1)
J3_train          = np.array(J3_train).reshape(-1,1)

'''scale test params'''
J2_test          = np.array(J2_test).reshape(-1,1)
JZ_test          = np.array(JZ_test).reshape(-1,1)
JD_test          = np.array(JD_test).reshape(-1,1)
J3_test          = np.array(J3_test).reshape(-1,1)

scaler_J2.fit(np.concatenate((J2_train, J2_test), axis = 0))
scaler_JZ.fit(np.concatenate((JZ_train, JZ_test), axis = 0))
scaler_J3.fit(np.concatenate((J3_train, J3_test), axis = 0))
scaler_JD.fit(np.concatenate((JD_train, JD_test), axis = 0))

J2_scaled_train   = scaler_J2.transform(J2_train)
JZ_scaled_train   = scaler_JZ.transform(JZ_train)
J3_scaled_train  = scaler_J3.transform(J3_train)
JD_scaled_train   = scaler_JD.transform(JD_train)

J2_scaled_test   = scaler_J2.transform(J2_test)
JZ_scaled_test   = scaler_JZ.transform(JZ_test)
J3_scaled_test  = scaler_J3.transform(J3_test)
JD_scaled_test   = scaler_JD.transform(JD_test)



In [None]:
def baseline_model(list :list, input_shape, output_shape, d_ind, loss):
    model = Sequential()
    '''loop over nodes in the list'''
    for idx, node in enumerate(list):

        if idx == 0:
            model.add(Dense(node, input_shape=input_shape, kernel_initializer=tf.keras.initializers.GlorotUniform(seed=SEED) , activation='relu'))
            continue

        '''adding dropout layer before dense layer (number idx)'''
        if idx == d_ind[0] and idx != 0:
            model.add(Dropout(rate=d_ind[1], noise_shape=None, seed=None))

        model.add(Dense(node, kernel_initializer=tf.keras.initializers.GlorotUniform(seed=SEED), activation='relu'))

    model.add(Dense(output_shape, kernel_initializer=tf.keras.initializers.GlorotUniform(seed=SEED), activation= 'relu'))
    model.compile(loss=loss, optimizer='adam', metrics=["mae", "mse"])
    return model 

def plot_loss(history,name, log):
    plt.plot(history.history['loss'], label='loss')
    plt.plot(history.history['val_loss'], label='val_loss')
    if log == True:
        plt.yscale('log')
    plt.title('Loss errors as functions of epochs',fontsize=18)
    plt.xlabel('Epoch',fontsize=18)
    plt.ylabel('Error',fontsize=18)
    plt.legend(fontsize=14)
    plt.grid(True)
    plt.savefig(f'{name}_loss', dpi = 300)
    plt.show()


def fit_and_evaluate(nodes, epochs, batch_size, X_test_scaled, y_test_scaled, X_train_scaled, y_train_scaled, input_shape, output_shape, d_ind, loss, name, log:bool):
    NN_model = baseline_model(nodes, input_shape, output_shape, d_ind, loss)
    history = NN_model.fit(X_train_scaled, y_train_scaled, epochs=epochs, batch_size=batch_size, validation_split=0.1)
    results = NN_model.evaluate(X_test_scaled, y_test_scaled)
    print("test loss, test mae:", results)
    NN_model.save(f'{name}.h5')

    plot_loss(history,name,log)

    J_predicted = NN_model.predict(X_test_scaled)
    J_predicted_training = NN_model.predict(X_train_scaled)

    return J_predicted, J_predicted_training

def MSE(true,pred,param='none'):
    MSE = mean_squared_error(y_true=true, y_pred=pred)
    return float(format(MSE, ".4f"))

def MAE(true,pred,param='none'):
    MAE = mean_absolute_error(y_true=true, y_pred=pred)
    return float(format(MAE, ".4f"))


def compute_fidelity(true:np.array, pred:np.array):

    E_pred_true    = np.mean(np.multiply(pred, true))
    E_pred         = np.mean(pred)
    E_true         = np.mean(true)
    E_pred2        = np.mean(np.multiply(pred, pred))
    E_true2        = np.mean(np.multiply(true, true))
    
    fidelity       = abs(E_pred_true - E_pred*E_true) / math.sqrt((E_true2 - E_true**2)*(E_pred2 - E_pred**2))
    return float(format(fidelity, ".4f"))


In [None]:
print(f"train size {dcs_PCA_train.shape}, test shape {dcs_PCA_test.shape}")

inputlayer_shape    = (pca_components,)
X_train_scaled      = dcs_PCA_train
X_test_scaled       = dcs_PCA_test

y_train_scaled      = np.concatenate((J2_scaled_train, JZ_scaled_train, JD_scaled_train, J3_scaled_train), axis=1)
y_test_scaled       = np.concatenate((J2_scaled_test, JZ_scaled_test, JD_scaled_test, J3_scaled_test), axis=1)


In [None]:
'''fit and evaluate the model'''
J_predicted, J_predicted_training = fit_and_evaluate(nodes, epochs, batch_size, X_test_scaled, y_test_scaled, X_train_scaled, y_train_scaled, inputlayer_shape, outputlayer_shape, d_ind, loss, name, True)

In [None]:
'''MSE and MAE of predictions'''
print(MSE(J2_scaled_test, J_predicted[:,0:1], 'test J2'))
print(MSE(J2_scaled_train, J_predicted_training[:,0:1], 'train J2'))
print(MAE(J2_scaled_test, J_predicted[:,0:1], 'test J2'))
print(MAE(J2_scaled_train, J_predicted_training[:,0:1], 'train J2'))

print(MSE(JZ_scaled_test, J_predicted[:,1:2], 'test JZ'))
print(MSE(JZ_scaled_train, J_predicted_training[:,1:2], 'train JZ'))
print(MAE(JZ_scaled_test, J_predicted[:,1:2], 'test JZ'))
print(MAE(JZ_scaled_train, J_predicted_training[:,1:2], 'train JZ'))

print(MSE(JD_scaled_test, J_predicted[:,2:3], 'test JD'))
print(MSE(JD_scaled_train, J_predicted_training[:,2:3], 'train JDMI'))
print(MAE(JD_scaled_test, J_predicted[:,2:3], 'test JD'))
print(MAE(JD_scaled_train, J_predicted_training[:,2:3], 'train JDMI'))

print(MSE(J3_scaled_test, J_predicted[:,3:4], 'test J3'))
print(MSE(J3_scaled_train, J_predicted_training[:,3:4], 'train J3'))
print(MAE(J3_scaled_test, J_predicted[:,3:4], 'test J3'))
print(MAE(J3_scaled_train, J_predicted_training[:,3:4], 'train J3'))


In [None]:
'''scale back to original range'''
J2_predicted_inverse_scaled     = scaler_J2.inverse_transform(J_predicted[:,0:1])
JZ_predicted_inverse_scaled     = scaler_JZ.inverse_transform(J_predicted[:,1:2])
JD_predicted_inverse_scaled     = scaler_JD.inverse_transform(J_predicted[:,2:3])
J3_predicted_inverse_scaled     = scaler_J3.inverse_transform(J_predicted[:,3:4])

J2_predicted_training_scaled    = scaler_J2.inverse_transform(J_predicted_training[:,0:1])
JZ_predicted_training_scaled    = scaler_JZ.inverse_transform(J_predicted_training[:,1:2])
JD_predicted_training_scaled    = scaler_JD.inverse_transform(J_predicted_training[:,2:3])
J3_predicted_training_scaled    = scaler_J3.inverse_transform(J_predicted_training[:,3:4])

J2_test     = scaler_J2.inverse_transform(J2_scaled_test)
JZ_test     = scaler_JZ.inverse_transform(JZ_scaled_test)
JD_test     = scaler_JD.inverse_transform(JD_scaled_test)
J3_test     = scaler_J3.inverse_transform(J3_scaled_test)
J2_train    = scaler_J2.inverse_transform(J2_scaled_train)
JZ_train    = scaler_JZ.inverse_transform(JZ_scaled_train)
JD_train    = scaler_JD.inverse_transform(JD_scaled_train)
J3_train    = scaler_J3.inverse_transform(J3_scaled_train)


In [None]:
def plot_baselines(baselines: list, name: str, chi: float, NN_name: str):
    """Function to plot baselines of predicted values compared to true values.
    Args:
        baselines (list): List of tuples containing parameter name, true values, and predicted values.
        name (str): Name for saving the plot.
        chi (float): Random noise value used in the dataset.
    """
    fig, axes = plt.subplots(nrows=2, ncols=len(baselines)//2, figsize = (10,5))
    axes = axes.flatten()

    for idx, pred in enumerate(baselines):
        axes[idx].scatter(pred[1], pred[2], c = 'blue')
        axes[idx].set_title(rf'{pred[0]} [J$_1$]', fontsize=25)
        axes[idx].plot(pred[1], pred[1], color='red')
        fid = compute_fidelity(pred[2], pred[1])
        axes[idx].text(0.2, 0.8, r'$\mathcal{F}$'+ f' :{float(format(fid, ".2f"))}', fontsize= 20, transform=axes[idx].transAxes, horizontalalignment='center', verticalalignment='center') 
        axes[idx].tick_params(axis='both', which='major', labelsize=23)
    
    fig.supxlabel('True values', fontsize=25, y=0.02)
    fig.supylabel('Predicted values', fontsize=25, x=0.05)
    fig.suptitle(rf'Predicted parameters with $\chi$={chi} for {NN_name}', fontsize=25)
    plt.subplots_adjust(left=0.17, bottom=0.18, right=0.98, top=0.82, wspace=0.35, hspace=0.66)
    plt.savefig(f'baselines_'+name+'.png', dpi = 500)
    plt.show()


In [None]:

baselines=[('J2', J2_test, J2_predicted_inverse_scaled), 
           ('JZ', JZ_test, JZ_predicted_inverse_scaled), 
           ('JDMI', JD_test, JD_predicted_inverse_scaled), 
           ('J3', J3_test, J3_predicted_inverse_scaled)]

plot_baselines(baselines=baselines, name='scatter', chi=0, NN_name=name)

predicted_data_save = np.concatenate((J2_predicted_inverse_scaled, JZ_predicted_inverse_scaled, JD_predicted_inverse_scaled, J3_predicted_inverse_scaled), axis=1)
np.savetxt('predicted_results_'+name+'.txt', np.array(predicted_data_save))


In [None]:
model = tf.keras.models.load_model(f'/Users/35840/Documents/CQM_2024/masters_project/{name}.h5')

with open(f'pca_{name}.pkl', 'rb') as pickle_file: 
     pca_test = pickle.load(pickle_file)

def compute_fid_with_seed(test, test_params, model_fid, chi_range, pca_i):
    '''Function to compute fidelity of predictions with added noise and specified seed.
    Args:
        test (np.array): Test data array with shape (n_samples, n_features).
        test_params (np.array): Test parameters array with shape (n_samples, n_params).
        model_fid (tf.keras.Model): Trained model for predictions.
        chi_range (np.array): Array of noise levels to apply.
        pca_i (PCA): PCA object for transforming the test data.
    Returns:
        tuple: Tuple containing lists of fidelities and errors for each parameter, as well as noise values and parameter values.
    '''
    # Define ranges and initialize lists for results
    fidelity_results = {'J2': [], 'JZ': [], 'JDM': [], 'J3': []}
    error_results = {'J2': [], 'JZ': [], 'JDM': [], 'J3': []}

    # Inverse transform test parameters
    J2_test = scaler_J2.inverse_transform(test_params[:, 0:1].reshape(-1,1))
    JZ_test = scaler_JZ.inverse_transform(test_params[:, 1:2].reshape(-1,1))
    JD_test = scaler_JD.inverse_transform(test_params[:, 2:3].reshape(-1,1))
    J3_test = scaler_J3.inverse_transform(test_params[:, 3:4].reshape(-1,1))

    def compute_and_store_fidelity(fids, pred, true):
        fids.append(compute_fidelity(true, pred))
        
    # Iterate over the noise range
    for i, g in enumerate(chi_range):
        dcs_gaussian = gaussian_noise(np.copy(test), mu=0.0, std= g * np.std(test), seed=i)

        dcs_test_noisy_transformed = pca_i.transform(dcs_gaussian)
        noisy_results = model_fid.predict(dcs_test_noisy_transformed)

        # Inverse transform predictions
        J2_pred_from_noise = scaler_J2.inverse_transform(noisy_results[:, 0:1])
        JZ_pred_from_noise = scaler_JZ.inverse_transform(noisy_results[:, 1:2])
        JD_pred_from_noise = scaler_JD.inverse_transform(noisy_results[:, 2:3])
        J3_pred_from_noise = scaler_J3.inverse_transform(noisy_results[:, 3:4])

        # Compute and store fidelities
        compute_and_store_fidelity(fidelity_results['J2'], J2_pred_from_noise, J2_test)
        compute_and_store_fidelity(fidelity_results['JZ'], JZ_pred_from_noise, JZ_test)
        compute_and_store_fidelity(fidelity_results['JDM'], JD_pred_from_noise, JD_test)
        compute_and_store_fidelity(fidelity_results['J3'], J3_pred_from_noise, J3_test)

        if g == 1:
            '''Plotting scatter plots for noisy data with chi=1.0'''

            baselines=[('J2', J2_test, J2_pred_from_noise), 
            ('JZ', JZ_test, JZ_pred_from_noise), 
            ('JDMI', JD_test, JD_pred_from_noise), 
            ('J3', J3_test, J3_pred_from_noise)]

            print(test_params[4,:])
            plot_baselines(baselines=baselines, name=f'TEST', chi=g, NN_name=name)
            predicted_data_NOISYsave = np.concatenate((J2_pred_from_noise, J2_test, JZ_pred_from_noise, JZ_test, JD_pred_from_noise, JD_test, J3_pred_from_noise, J3_test), axis=1)
            #np.savetxt('predicted_results_'+name+'_noisy.txt', np.array(predicted_data_NOISYsave))
        
    return fidelity_results['J2'], fidelity_results['JZ'], fidelity_results['JDM'], fidelity_results['J3'], \
           error_results['J2'], error_results['JZ'], error_results['JDM'], error_results['J3']

chi_range  = np.linspace(0,3,31)

J2_fids, JZ_fids, JDM_fids, J2Z_fids, J2_errors, JZ_errors, JDM_errors, J2Z_errors= compute_fid_with_seed(dcs_test, y_test_scaled, model, chi_range, pca_test)

In [None]:

def compute_fid_stochastic(test, test_params, model_fid, chi_range, pca_i,
    MAE, MSE, compute_fidelity, gaussian_noise,
    scalers, param_keys, indices,
    n_runs=10, run_seed=None, results_csv="all_metrics.csv"
    ):
    '''Function to compute fidelity metrics over multiple runs and noise levels.
    Args:
        test (np.array): Test dataset.
        test_params (np.array): True parameters for the test dataset.
        model_fid (tf.keras.Model): Trained model for predictions.
        chi_range (list): List of noise levels to apply.
        pca_i (PCA): PCA object for data transformation.
        MAE (function): Function to compute Mean Absolute Error.
        MSE (function): Function to compute Mean Squared Error.
        compute_fidelity (function): Function to compute fidelity.
        gaussian_noise (function): Function to add Gaussian noise to data.
        scalers (dict): Dictionary of scalers for inverse transformation.
        param_keys (list): List of parameter keys.
        indices (dict): Dictionary mapping parameter keys to their indices in the predictions.
        n_runs (int): Number of runs to perform.
        run_seed (int): Seed for random number generation.
        results_csv (str): Filename to save the results CSV.
    Returns:
        pd.DataFrame: DataFrame containing all computed metrics.
        
    The function performs the following steps:
    1. Initializes an empty list to store all metrics.
    2. Inverse-transforms the true parameters to their original range.
    3. For each run, iterates over the specified noise levels.
    4. Adds Gaussian noise to the test data.
    5. Transforms the noisy data using PCA and makes predictions using the trained model.
    6. Inverse-transforms the predictions to the original parameter range.
    7. Computes MAE, MSE, and fidelity for each parameter.'''

    all_metrics = []

    # computing test values (inverse-transformed to original range)
    true_params = {
        key: scalers[key].inverse_transform(test_params[:, [idx]])
        for key, idx in indices.items()
    }

    for run in range(n_runs):
        print(f"Starting run {run + 1}/{n_runs}...")
        for g in chi_range:
            test_noisy = gaussian_noise(np.copy(test), mu=0.0, std=g * np.std(test), seed=run_seed)

            transformed = pca_i.transform(test_noisy)
            predictions = model_fid.predict(transformed)

            # inverse transforming predictions
            pred_params = {
                key: scalers[key].inverse_transform(predictions[:, [indices[key]]])
                for key in param_keys
            }

            # computing metrics
            mae_vals = {
                key: MAE(true=test_params[:, [indices[key]]], pred=predictions[:, [indices[key]]])
                for key in param_keys
            }

            mse_vals = {
                key: MSE(true=test_params[:, [indices[key]]], pred=predictions[:, [indices[key]]])
                for key in param_keys
            }

            fid_vals = {
                key: compute_fidelity(true=true_params[key], pred=pred_params[key])
                for key in param_keys
            }

            # savong all to metrics list
            all_metrics.append({
                "run": run,
                "g": g,
                **{f"MAE_{key}": mae_vals[key] for key in param_keys},
                **{f"MSE_{key}": mse_vals[key] for key in param_keys},
                **{f"FID_{key}": fid_vals[key] for key in param_keys},
            })

    #creating a DataFrame and save to CSV
    df = pd.DataFrame(all_metrics)
    df.to_csv(results_csv, index=False)
    print(f"Saved all metrics to {results_csv}")

    return df

param_keys = ['J2', 'JZ', 'JDM', 'J3']
indices = {'J2': 0, 'JZ': 1, 'JDM': 2, 'J3': 3}
scalers = {
    'J2': scaler_J2,
    'JZ': scaler_JZ,
    'JDM': scaler_JD,
    'J3': scaler_J3
}

model = tf.keras.models.load_model(f'/Users/35840/Documents/CQM_2024/masters_project/{name}.h5')

df_all = compute_fid_stochastic(
    test=dcs_test,
    test_params=y_test_scaled,
    model_fid=model,
    chi_range=np.linspace(0, 3, 31),
    pca_i=pca,
    MAE=MAE,
    MSE=MSE,
    compute_fidelity=compute_fidelity,
    gaussian_noise=gaussian_noise,
    scalers=scalers,
    param_keys=param_keys,
    indices=indices,
    n_runs=10,
    run_seed=None,
    results_csv=f"{name}.csv"
)

