# **Strategic (S) Autonomous (A) Noisy (N) Experiments (E)**
Nov, 2024

This is the notebook contains only the developed functions for SANE

-- Notebook prepared by **Arpan Biswas**





Import required packages

In [None]:
import gpax
import numpy as np
import matplotlib.pyplot as plt
import scipy
import os
import h5py
import sidpy
import time

from mycolorpy import colorlist as mcp

gpax.utils.enable_x64()  # enable double precision

from warnings import filterwarnings

import math


from sklearn.model_selection import train_test_split

from atomai.utils import get_coord_grid, extract_patches_and_spectra, extract_subimages

import h5py
import sidpy
import pyUSID as usid

List of functions for SANE module
1. Human assessment (Initial data)
2. prediction model for constraint
3. Hybrid cost based acquisiton function
4. Function to check for the region of interest
5. Prediction model for function evaluation
6. Plot function


In [None]:
import time
# Function for perform voting to assess spectral quality
def votes_initialdata(img, X_indices, data, idx):
    beps_wv= data[0]
    spectra = data[1]
    idx_good = []
    idx_bad = []
    for i in range(0, len(idx)):
        #plt.figure(figsize = (5, 4))
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4), dpi=200)
        ax1.imshow(img, origin = "lower")
        ax1.scatter(X_indices[i, 0], X_indices[i, 1], color = 'red', marker = 'x', s =50)
        #ax1.set_xlabel("Composition")
        #ax1.set_ylabel("y")
        #ax1.legend()
        ax1.set_title("Initial target values, location"+str(X_indices[i,:]))
        ax2.plot(beps_wv, spectra[int(X_indices[i, 0]), int(X_indices[i, 1]), :], linewidth = 2)
        ax2.set_title("Initial spectral")
        ax2.set_xlabel("Wavelength")
        ax2.set_ylabel("Polarization")
        plt.show()

        # User votes the quality of the spectral- Domain knowledge injection in BO initialization
        print("Is the current spectral good? y- Yes or press ENTER- No")
        time.sleep(0.5)
        ip = input("Answer: ")
        if (ip == 'Y' or ip == 'y'):
            idx_good = np.hstack((idx_good, idx[i]))
            #idx_good.append(idx[i])
        else:
            idx_bad = np.hstack((idx_bad, idx[i]))
            #idx_bad.append(idx[i])
    idx_good = idx_good.astype(int)
    idx_bad = idx_bad.astype(int)

    return idx_good, idx_bad

# Function for perform GP-- to contrain between good and bad region
def run_constraindkl (xtrain, ytrain, xtest, fn = None, fn_priors = None):
    rng_key, rng_key_predict = gpax.utils.get_keys(1000)
    data_dim = xtrain.shape[-1]
    # update GP posterior
    dkl = gpax.viDKL(data_dim, 2, kernel='RBF')
    dkl.fit(  # you may decrease step size and increase number of steps (e.g. to 0.005 and 1000) for more stable performance
        rng_key, xtrain, ytrain, num_steps=1000, step_size=0.005)

    y_pred, y_var = dkl.predict(rng_key_predict, xtest)

    return dkl, y_pred, y_var

# Function for perform constrained cost based acq function
def cost_acqfun(models, train_x, Data, cost_params, params, isnorm=False):
    """
    cost based acquisition function- EI based
    Args:
        model: krigging model
        train_x: training data
        Data: Unexplored data to evaluate for next sample selection in list with actual values and normalized values
        cost_params: Params required for cost aware acquisition function
        params: Params required for standard EI acq function
        ieval: All the current locations explored
        isnorm: If the data need to be normalized. Default is False

    Returns:
        array of acquisition function values
    """

    data_real = Data[0] # True data
    data_norm = Data[1] #Normalize data
    indices_real = Data[2] # indices data

    best_x = cost_params[0]
    exp_x = cost_params[1]
    a = cost_params[2]
    b = cost_params[3]
    switch = cost_params[4]

    if isnorm == False:
        data = data_real
        # print(data)
    else:
        data = data_norm
        # print(data)

    dkl_model = models
    # Compute standard acq function
    standard_acq = gpax.acquisition.EI(params, dkl_model, data, maximize=True, noiseless=True)
    standard_acq = np.array(standard_acq)

    # Check for stability
    standard_acq[np.isnan(standard_acq)==True] = np.random.uniform(low=0.0, high=1.0, size=standard_acq[np.isnan(standard_acq)==True].shape)*1e-3
    # Eliminate evaluated samples from consideration to avoid repeatation in future sampling
    #standard_acq[ieval] = 0
    # Compute cost on non-normalized data
    prev_x = train_x[-1,:]
    x_center = best_x
    l_exp = len(exp_x)-1
    cost = np.zeros((standard_acq.shape[0], 2))
    c = np.zeros((standard_acq.shape[0]))

    for i in range(indices_real.shape[0]):
        cost[i, 0] = a*(np.absolute(indices_real[i,0]-x_center[0])+np.absolute(indices_real[i,1]-x_center[1])) # encourage local exploration

        for j in range(l_exp): # encourage already invested locations of potential region of interest when global exploration is continued
          cost[i, 1] = cost[i, 1] + (np.absolute(indices_real[i,0]-train_x[exp_x[j], 0])+np.absolute(indices_real[i,1]-train_x[exp_x[j], 1]))
          cost[i, 1] = cost[i, 1]*b
        if l_exp > 0:
          cost[i, 1] = cost[i, 1]/l_exp
          #cost[:, 1] = 0
        c[i] = -cost[i, 0] + cost[i, 1]
        #if c[i] < 0:
        #  c[i] = 0.1

    c_norm = (c-np.min(c))/(np.max(c)-np.min(c))

    if (np.max(standard_acq)+1e-5)/((np.sum(standard_acq))+1e-5) >= 0.75:
        print("Full local exploration initiated")
        #standard_acq = 0*standard_acq
        cost_acq = c_norm # discourage already invested locations of potential region of interest when global learning is completed

    else:
        if switch == 1: #Explorative
          cost_acq = standard_acq*(np.sum(cost, axis=1)/np.max(np.sum(cost, axis=1)))
        else: #Exploitative
          cost_acq = standard_acq/(np.sum(cost, axis=1)/np.max(np.sum(cost, axis=1)))
        #c=-c+1
        #cost_acq = standard_acq/c
        #standard_acq = (standard_acq - np.min(standard_acq))/(np.max(standard_acq)- np.min(standard_acq))
        #c= 1-c
        #cost_acq = standard_acq*c*100

    return cost_acq

# Function to check region of interest/given the region is a good region
def check_ROI(train_x, train_y, best_x, best_y, exp_x):
    #best_y2 = np.max(train_y[exp_x[-1]+1:])
    #best_x2 = train_x[np.argmax(train_y[exp_x[-1]+1:])]
    #Find Euclidean distance between best x and the later explored x

    l = len(train_x[exp_x[-1]+1:])
    l_exp = len(exp_x)-1
    dist_x = np.zeros(l)
    dist_y = np.zeros(l)
    dist_z = np.zeros(l)
    for i in range(l):
        dist_x[i] = np.absolute(best_x[0] - train_x[exp_x[-1]+i+1,0]) + np.absolute(best_x[1] - train_x[exp_x[-1]+i+1,1])
        dist_y[i] = train_y[exp_x[-1]+i+1]/best_y
        for j in range(l_exp): # discourage already invested locations of potential region of interest
          dist_z[i] = dist_z[i] + np.absolute(train_x[exp_x[-1]+i+1,0] - train_x[exp_x[j],0]) + np.absolute(train_x[exp_x[-1]+i+1,1] - train_x[exp_x[j],1])
        dist_z[i] = dist_z[i]/l_exp

    dim_x = len(best_x)
    #print(dim_x)

    F = (dist_x * dist_y * dist_z)/dim_x
    F_max = np.max(F)
    F_imax= np.argmax(F)


    prob_ROI = (dist_x[F_imax] + dist_y[F_imax])/dim_x
    if prob_ROI > 1:
        prob_ROI = 1
    #print(F_max, prob_ROI)
    #D = np.max(dist_x/dim_x) #Measure closeness

    #O = np.max(dist_y) #Measure optimality
    # We want to maximize D and O
    #print(D, O)
    #prob_ROI = min(D, O)
    #print(prob_ROI)
    #Probabilistic choice of identify the next best solution as a potential ROI
    #np.random.seed(1000)
    if np.random.choice(np.arange(2), size=1, p =[1-prob_ROI, prob_ROI]) == 1: #New ROI
        exp_x = np.hstack((exp_x, int(exp_x[-1]+F_imax+1)))
        print("New potential region of interest found with prob=", prob_ROI)
    else:
        exp_x = exp_x #No new ROI
        print("No region of interest found")


    return exp_x


# Function for performing a GP step, here we use structure GP (sGP)
def run_statdKL (xtrain, ytrain, xindices, Data, fn = None, fn_priors = None, constraint = None, pen = 1, cost_params = None, isnorm=False):
  data_real = Data[0] # True data
  data_norm = Data[1] #Normalize data


  rng_key, rng_key_predict = gpax.utils.get_keys(1000)

  data_dim = xtrain.shape[-1]
  # update posterior
  dkl = gpax.viDKL(data_dim, 2, kernel='Matern')
  dkl.fit(  # you may decrease step size and increase number of steps (e.g. to 0.005 and 1000) for more stable performance
      rng_key, xtrain, ytrain, num_steps=1000, step_size=0.005)

  # Compute acquisition function
  #obj = gpax.acquisition.UCB(rng_key_predict, gp_model, X_test)
  cost_acq = cost_acqfun(dkl, xindices, Data, cost_params, rng_key_predict)

  if constraint is not None:
    cpen = np.ceil(np.max(cost_acq))
    #print(cpen, np.max(cost_acq), np.max(constraint), np.min(constraint))
    for i in range(0, len(constraint)):
      if constraint[i] <= 0:
        cost_acq[i] = cost_acq[i] + (pen*cpen*constraint[i])

  # Get GP prediction
  if isnorm == False:
    y_pred, y_var = dkl.predict(rng_key_predict, data_real)

  else:
    y_pred, y_var = dkl.predict(rng_key_predict, data_norm)


  return dkl, cost_acq, y_pred, y_var


def plot_result(indices, obj, data):
    beps_wv= data[0]
    spectra = data[1]
    for i in range(0,len(obj)):
      if obj[i] < 0:
        obj[i] = 0
    plt.figure(figsize = (4, 4))
    a = plt.scatter(indices[:, 0], indices[:, 1], s=10, c=obj)
    plt.colorbar(a)
    next_point = indices[obj.argmax()]
    plt.scatter(next_point[0], next_point[1], s=10, marker='s', c='r', linewidths=2)
    #plt.xticks([])
    #plt.yticks([])
    plt.title("Acquisition Values")
    plt.show()

    plt.figure(figsize = (4, 4))
    plt.plot(beps_wv, spectra[int(next_point[0]), int(next_point[1]), :], linewidth = 2)
    plt.title("spectral at location:" +str(next_point))
    plt.xlabel("Wavelength")
    plt.ylabel("Polarization")
    plt.show()

Script for human assesment to contraint metric

In [None]:
y_vote = np.zeros(len(y_measured))
for i in range(0, len(y_vote)):
    y_good = 0
    y_bad = 0
    for j1 in range(0, len(idx_good)):
      y_good = y_good + np.absolute(indices_measured[i, 0] -  indices_all[idx_good[j1], 0]) + np.absolute(indices_measured[i, 1] -  indices_all[idx_good[j1], 1])
    y_good = y_good/len(idx_good)
    #print(y_good)

    for j2 in range(0, len(idx_bad)):
      y_bad = y_bad + np.absolute(indices_measured[i, 0] -  indices_all[idx_bad[j2], 0]) + np.absolute(indices_measured[i, 1] -  indices_all[idx_bad[j2], 1])
    y_bad = y_bad/len(idx_bad)
    #print(y_bad)

    y_vote[i] = y_bad - y_good # Turning into maximization problem


print(idx_good, idx_bad, y_vote)

SANE parameter settings

In [None]:
#Swtiching parameter
k = np.zeros(100)
k[60:] = k[60:] + 1
plt.plot(k)

# Below parameters are currently fixed as 1
c_loc = 1
c_glo = 1

Full workflow is provided in the Analysis folder for each test cases