# Import packages

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import math
import time

from sklearn.model_selection import train_test_split
from IPython.display import display, clear_output

import atomai as aoi

In [None]:
import aespm as ae

# Make the connection and custom functions

In [None]:
host = ''
username = ''
password = ''

folder = r"C:\Users\Asylum User\Documents\Asylum Research Data\240410"

exp = ae.Experiment(folder=folder, connection=[host, username, password])

In [None]:
# let's read some useful parameters first

xsens, ysens= ae.read_spm(key=['XLVDTSens', 'YLVDTSens'], connection=exp.connection)
exp.update_param('sensitivity', value=[xsens, ysens])

exp.param['sensitivity']

## Custom functions for taking measurements

In [None]:
## Commonly used custom functions

def read_data(self, folder="C:\\Users\\Asylum User\\Documents\\AEtesting\\data_exchange"):
    '''
    Read the latest ibw file saved in a given folder.
    '''
    fname = ae.get_files(path=folder, client=self.client)[0]
    return ae.ibw_read(fname, connection=self.connection)

exp.add_func(read_data)

# Function to move the probe with the given displacement 
def move_probe(self, distance, v0=None, s=None):
    
    # Enable the stage move --> 5 sec, 8 seconds for safety
    ae.move_tip(r=distance, v0=v0, s=s, connection=self.connection)

exp.add_func(move_probe)

# Function to check the file number in a given folder
def check_files(self):
    return ae.check_file_number(path=self.folder, connection=self.connection)
exp.add_func(check_files)

def load_ibw_parameter(self, data):
    
    scan_angle = data.header['ScanAngle']
    xpixels, ypixels = data.header['PointsLines'],data.header['ScanPoints']
    xsize, ysize = data.header['FastScanSize'],data.header['SlowScanSize']

    xfactor = xsize / xpixels
    yfactor = ysize / ypixels
    
    p = {
        'ScanAngle': scan_angle,
        'xpixels': xpixels,
        'ypixels': ypixels,
        'xsize': xsize,
        'ysize': ysize,
        'xfactor': xfactor,
        'yfactor': yfactor,
        'center': np.array([xsize, ysize]) / 2,
    }
    
    for key in p:
        self.update_param(key=key, value=p[key])
exp.add_func(load_ibw_parameter)

def convert_coord(self, coord):
    '''
    Convert the coordinate from pixel to distance.
    Apply rotation if needed.
    '''
    x, y = coord
    
    if len(np.shape(coord)) == 1:
        x, y = [x], [y]
    scan_angle = self.param['ScanAngle']
    
    # Convert angle to radians
    theta_rad = np.radians(-scan_angle)
    
    # Create 2D rotation matrix
    rot_matrix = np.array([[np.cos(theta_rad), -np.sin(theta_rad)],
                           [np.sin(theta_rad), np.cos(theta_rad)]])
    
    # Apply the rotation matrix to the coordinates
    center = (np.array([self.param['xpixels'], self.param['ypixels']])-1) // 2
    x_rot, y_rot = np.zeros_like(x), np.zeros_like(y)
    for i in range(len(x)):
        x_rot[i], y_rot[i] = np.dot(rot_matrix, (np.array([x[i], y[i]])-center)) + center
    
    # Convert the pixels to the distance
    xfactor, yfactor = self.param['xfactor'], self.param['yfactor']

    positions = np.zeros([len(x), 2])

    for i in range(len(x)):
        positions[i] = np.array([x_rot[i] * xfactor, y_rot[i] * yfactor])

    # Sort the positions according to x first and y second
    pos_sorted = sorted(positions, key=lambda x: (x[1], x[0]))
    return pos_sorted[0]

exp.add_func(convert_coord)

def measure_spectrum(self, coord, basename, index, retry=100):
    
    x, y = self.convert_coord(coord)
    r = np.array([x, y]) - self.param['center']
    self.move_probe(distance=r, v0=self.param['v0'], s=self.param['sensitivity'])
    self.execute('IVDoItDART')
    
    time.sleep(5)
    retries = 0
    while retries < retry:
        try:
            ae.download_file(connection=self.connection, file_path=os.path.join(self.folder, '{}_{:04d}.ibw'.format(basename, index)), 
                          local_file_name='spec.ibw')
            d = load_ibw(file='spec.ibw', ss=True)
            return d
        except FileNotFoundError:
            retries += 1
            time.sleep(2)
            
exp.add_func(measure_spectrum)

In [2]:
def scal(obj):
    index = len(obj.bias) // 4
    index_up = [2*index, -1]
    index_down = [index, 3*index]
    return np.abs(np.mean(obj.amp_off[index_up])-np.mean(obj.amp_off[index_down]))

2

## Custom functions for DKL

In [None]:
def generate_seed(self, basename, num_seed=None, show=True):

    if num_seed == None:
        num_seed = self.param['seed_points']
        
    y_measured_unnor = []
    y_measured_raw = []

    exp.execute('ChangeName', value=basename+'_')

    for i in range (num_seed):
        print("Working on {}/{} points".format(i+1, num_seed), end='\r')
        
        if i % 10 == 0: # Re-tune the probe every 10 measurements at the center of the image
            exp.execute('ClearForce')
            exp.execute('GoThere', wait=1)
            ae.tune_probe(num=1, center=320e3, width=50e3, connection=self.connection)
            
        x, y = float(indices_measured[i][0]), float(indices_measured[i][1])
        
        spec = self.measure_spectrum([x, y], basename=basename, index=i) # ask Richard
        scalarizer_y = scal(spec)
        y_measured_unnor.append(scalarizer_y)

        if show:
            clear_output(wait=True)
            plt.figure(figsize=[4,4])
            plt.plot(spec.bias, spec.amp_off, '.-')
            plt.xlabel('Sample Bias (V)')
            plt.ylabel('Piezo response (a.u.)')
            plt.show()
            
        self.update_param('y_measured_unnor', y_measured_unnor)
        self.update_param('y_measured_raw', y_measured_raw)
        self.update_param('num_seed', num_seed)
        self.update_param('y_measured', np.asarray(y_measured_unnor) * 1e10)
        
exp.add_func(generate_seed)

In [None]:
def run_dkl(self, basename, exploration_steps=200, num_cycles=1000, acquisition='EI', xi=0.025, show=True, save=None):

    data_dim = self.param['X_unmeasured'].shape[-1]
    window_size = self.param['window_size']
    exp.execute('ChangeName', value=basename+'_')
    
    for e in range(exploration_steps):
        
        print("Working on: {}/{}".format(e+1, exploration_steps), end='\r')
        if e % 10 == 0: # Re-tune the probe every 10 measurements at the center of the image
            exp.execute('ClearForce')
            exp.execute('GoThere', wait=1)
            ae.tune_probe(num=1, center=320e3, width=50e3, connection=exp.connection)
        # Update GP posterior
        dklgp = aoi.models.dklGPR(data_dim, embedim=2, precision="single")
        dklgp.fit(self.param['X_measured'], self.param['y_measured'], training_cycles=num_cycles)
        mean, var = dklgp.predict(self.param['X_unmeasured'], batch_size=len(self.param['X_unmeasured']))
        
        # Compute acquisition function
        obj, next_point_idx = dklgp.thompson(self.param['X_unmeasured'])
        # obj, next_point_idx = dklgp.EI(self.param['X_unmeasured'])
        next_point = self.param['indices_unmeasured'][next_point_idx]
        print(next_point_idx)
        # Do "measurement"
        x, y = float(next_point[0, 0]), float(next_point[0, 1])
        to_plot = self.measure_spectrum([x, y], basename=basename, index=e)
        # array = to_plot.amp_off
        
        measured_point = scal(to_plot) * 1e10
        
        # Plot current result
        # Update train and test datasets
        self.update_param('X_measured', np.append(X_measured, X_unmeasured[next_point_idx][None], 0))
        self.update_param('X_unmeasured', np.delete(X_unmeasured, next_point_idx, 0))
        self.update_param('y_measured', np.append(y_measured, measured_point))
        self.update_param('indices_measured', np.append(indices_measured, next_point[None], 0))
        self.update_param('indices_unmeasured', np.delete(indices_unmeasured, next_point_idx, 0))

        if show:
            size_x = int(np.sqrt(mean.shape[0]))
            
            clear_output(wait=True)
            fig, ax = plt.subplots(2,2,figsize=[6,6])
            ax[0, 0].plot(to_plot.bias, to_plot.amp_off, '.-')
            ax[0, 0].set_xlabel('Sample Bias (V)')
            ax[0, 0].set_ylabel('Piezo response (a.u.)')
                          
            ax[0, 1].imshow(img, origin='lower')
            ax[0, 1].set_title('Height')
        
            ax[1, 0].imshow(mean.reshape(size_x, size_x), origin='lower')
            ax[1, 0].set_title('Prediction')
        
            ax[1, 1].imshow(var.reshape(size_x, size_x), origin='lower')
            ax[1, 1].set_title('Uncertainty')
        
            plt.tight_layout()
            plt.show()
    if save is not None:
        # Save final traindata
        np.savez("results_PTO_20240401.npz", 
                 X_measured = self.param['X_measured'], 
                 X_unmeasured = self.param['X_unmeasured'],
                 y_measured = self.param['y_measured'], 
                 indices_measured = self.param['indices_measured'], 
                 indices_unmeasured = self.param['indices_unmeasured'])

exp.add_func(run_dkl)

# Acquire a scan

In [None]:
exp.execute('DownScan', wait=1.5)
exp.check_files()

In [None]:
image = exp.read_data(folder=exp.folder)
exp.load_ibw_parameter(image)

# Use the topography map as the structural map
img = image.data[0]

plt.imshow(image.data[0], origin='lower')

In [None]:
# Generate image patches and corresponding indices

window_size = 16
exp.update_param('window_size', window_size)
coordinates = aoi.utils.get_coord_grid(img, step=1, return_dict=False)
# extract subimage for each point on a grid
features_all, coords, _ = aoi.utils.extract_subimages(img, coordinates, window_size)

features_all.shape, coords.shape

In [None]:
features_all = features_all[:,:,:,0]
coords = np.array(coords, dtype=int)
indices_all = coords

print(coords.shape)
print(features_all.shape)

# see a patch : what atomai gave
k = 200

f, (ax1, ax2) = plt.subplots(1, 2, figsize=(6, 3), dpi =100)
ax1.imshow(img)
ax1.scatter(coords[k, 1], coords[k, 0], marker='X', s=50, c='r')


ax2.imshow(features_all[k])

In [None]:
norm_ = lambda x: (x - x.min()) / x.ptp()
features = norm_(features_all)

n, d1, d2 = features.shape
X = features.reshape(n, d1*d2)
X.shape

# use only 0.02% of grid data points as initial training points
(X_measured, X_unmeasured, indices_measured, indices_unmeasured) = train_test_split(
      X, indices_all, test_size=0.99975, shuffle=True, random_state=5)

seed_points = len(X_measured)


np.savez("seeds_PTO_20240415_1.npz", X_measured = X_measured, X_unmeasured = X_unmeasured, 
         indices_measured = indices_measured, indices_unmeasured = indices_unmeasured)

p = {
    'X_measured': X_measured,
    'X_unmeasured': X_unmeasured,
    'indices_measured': indices_measured,
    'indices_unmeasured': indices_unmeasured,
    'features_all': features_all,
    'X': X,
    'indices_all': indices_all,
    'seed_points': seed_points
}

for key in p:
    exp.update_param(key, p[key])

seed_points

## Generate seed

In [None]:
# Move the probe to the center of the image, and record the X Y sensor readings there
action_list = [
    ['ClearForce', None, None], # Clear any existing force points
    ['GoThere', None, 1], # Move to the center of the image
]

exp.execute_sequence(action_list)

v0 = read_spm(key=['PIDSLoop.0.Setpoint', 'PIDSLoop.1.Setpoint'], connection=exp.connection)

exp.update_param('v0', v0)

In [None]:
exp.generate_seed('PTO_seed')

## Run the DKL

In [None]:
exp.run_dkl('PTO_DKL1', num_cycles=200, save='test')