The notebook supports the tutorial publication ... The code includes the setup for running Ansys Lumerical simulations through the python API, executing a multiobjective optimization, as well as executing adjoint optimizations. Those can be used independently or in order to generate the data that is used to train neural network models.

It has been tested with Python version =  3.9.20 and Ansys Lumerical v 2.4.1.

@author: Grinbergy

In [None]:
### Setup the environment 

import sys
import os

sys.path.append('C:\\Program Files\\Lumerical\\v241\\api\\python')  # pointer to the Python API folder of your Ansys Lumerical installation

import numpy as np
from lumopt.utilities.wavelengths import Wavelengths
from lumopt.geometries.polygon import FunctionDefinedPolygon
from lumopt.utilities.simulation import Simulation
from lumopt.figures_of_merit.modematch import ModeMatch
from lumopt.optimizers.generic_optimizers import ScipyOptimizers
from lumopt.optimization import Optimization, SuperOptimization
from lumopt.utilities.load_lumerical_scripts import load_from_lsf


prj_dir = os.getcwd()
n_params = 10 # number of parameters that define the shape
param_bounds = [(0.1e-6, 2.2e-6)] * n_params # min and max values for each parameter
Mode_names = ['TE0','TE1','TE2'] # appear as part of the mode expansion monitors in lsf file - "mode_expansion_XXX"

total_len = 6e-6 # device length
input_wg_width = 0.5e-6 # should be the same as the lsf file
output_wg_width = 1e-6 # should be the same as the lsf file

# geometry function
# params represents [params_top, params_bottom]
def mode_exchanger_geom(params):

    # num of parameters must be even
    assert (len(params)/2).is_integer()

    Nx = int(len(params)/2)
    dx = total_len / (Nx+1)

    points_x = np.concatenate(
        ([-total_len / 2], np.linspace(-total_len / 2 + dx, total_len / 2 - dx, Nx), [total_len / 2]))
    points_y_top = np.concatenate(([input_wg_width/2], params[:Nx], [output_wg_width/2]))
    points_y_bottom = np.concatenate(([input_wg_width/2], params[Nx:], [output_wg_width/2]))

    px = np.linspace(points_x[0], points_x[-1], 100)
    py_top = np.interp(px, points_x, points_y_top)
    py_bottom = np.interp(px, points_x, points_y_bottom)
    py_top[0] = points_y_top[0]
    py_top[-1] = points_y_top[-1]
    py_bottom[0] = points_y_bottom[0]
    py_bottom[-1] = points_y_bottom[-1]

    # new linear interpolation with filtering
    # add 10 extra points per side to ensure smooth boundaries
    # remember that points_x[0] is negative
    px_long = np.concatenate((np.array([-10, -9, -8, -7, -6, -5, -4, -3, -2, -1]) * dx + points_x[0], px,
                              np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) * dx + points_x[-1]))
    py_long_top = np.concatenate((np.ones((10,)) * points_y_top[0], py_top, np.ones((10,)) * points_y_top[-1]))
    py_long_bottom = np.concatenate((np.ones((10,)) * points_y_bottom[0], py_bottom, np.ones((10,)) * points_y_bottom[-1]))
    
    filt = np.exp(-np.power(px_long, 2.) / (2 * np.power(0.5e-7, 2.))) # gaussian smoothing 
    filtered_py_top = np.convolve(py_long_top, filt, mode='same')
    filtered_py_bottom = np.convolve(py_long_bottom, filt, mode='same')
    filtered_py_top = filtered_py_top / np.mean(filtered_py_top) * np.mean(py_long_top)
    filtered_py_bottom = filtered_py_bottom / np.mean(filtered_py_bottom) * np.mean(py_long_bottom)
    filtered_py_top = filtered_py_top[10:-10]
    filtered_py_bottom = filtered_py_bottom[10:-10]
    
    # filtered_py[0] = py[0]
    # filtered_py[-1] = py[-1]
    py_top = filtered_py_top  # py #filtered_py
    py_bottom = filtered_py_bottom  # py #filtered_py

    polygon_points_up = [(x, y) for x, y in zip(px, py_top)]
    polygon_points_down = [(x, -y) for x, y in zip(px, py_bottom)]
    polygon_points = np.array(polygon_points_up[::-1] + polygon_points_down)
    return polygon_points

global_sample_function = lambda: np.array([np.round(np.random.uniform(min_val, max_val),10) for min_val, max_val in param_bounds])
init_params = global_sample_function().flatten()

geometry = FunctionDefinedPolygon(func=mode_exchanger_geom,
                                  initial_params=init_params,
                                  bounds=param_bounds,
                                  z=0,
                                  depth=220e-9,
                                  eps_out=1.44 ** 2,
                                  eps_in=2.8 ** 2,
                                  edge_precision=5,
                                  dx=1e-9)

In [None]:
### Load Lumerical lsf file and add random geometry, then save to the fsp file that is later used to run simulations
### This is also used for testing to make sure everything is smooth in defining the gemoetry
sim = Simulation(prj_dir,
                 False, False)
sim.fdtd.switchtolayout()
sim.fdtd.eval(f'mode_exchanger;')
geometry.add_geo(sim, init_params, False)
sim.save(f"ModeExchanger.fsp")

In [None]:
#### For testing - simulate one structure and output the results

# note that the output of LUMOPT produces values that have min-bound subtracted! You would need to add "param_bounds[0][0]"
params = np.array([4.839e-07,7.772e-07,1.4823e-06,1.3544e-06,8.078e-07,5.083e-07,4.839e-07,7.772e-07,1.4823e-06,1.3544e-06]) 

sim = Simulation(prj_dir,
                 False, False)
sim.load(f"ModeExchanger.fsp")
sim.fdtd.switchtolayout()

# delete old shape if exists
script=('selectpartial("polygon");' +
        'delete;')
sim.fdtd.eval(script)

# add new geometry
geometry.add_geo(sim, params, False)
sim.fdtd.run()

FOMs = []
for TE_name in Mode_names:
        exp_TE = sim.fdtd.getresult("mode_expansion_" + TE_name, "expansion for input")
        data_TE = np.mean(exp_TE["T_net"])

        # round to 10^-4 resolution
        data_TE = np.round(data_TE,4)
        FOMs.append(data_TE)    

output = str([f'{Mode_names[i]} = {FOMs[i]} ' for i in range(len(Mode_names))])
output = output.translate(str.maketrans('', '', "'[]"))
print(output)

sim.fdtd.close()

In [None]:
### Setup for pyMOO

from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.optimize import minimize
from pymoo.core.problem import Problem
import csv
import os
import shutil

seed = 2 # random seed for reproducibility, also allows to restart the optimization from the last point it stopped (if lumopt failed)
n_Population = 50
n_Generations = 10
parallel = 4 # number of parallel simulations to run
fsp_prefix = f'analysis'

FOM_store = ['TE0','TE1','TE2'] # all FOMs to compute and store (we may want to store more data than what we optimize for)
FOM_opt_id = [0,1,2] # indexes of FOMs that the optimizer will optimize from FOM_store

csv_file = f'data_p{n_Population}_g{n_Generations}_s{seed}_FOM[{[FOM_store[i] for i in FOM_opt_id]}].csv' # output file

# create the fsp files for parallel simulations if not exist yet
source_path = os.path.join(prj_dir,f"ModeExchanger.fsp")
for i in range(parallel):
    if not os.path.exists(os.path.join(prj_dir,fsp_prefix + str(i) + '.fsp')):
        try:
            shutil.copy(source_path, os.path.join(prj_dir,fsp_prefix + str(i) + '.fsp'))
        except Exception as e:
            print("An error occurred while copying the file:", e)

In [None]:
### Run pyMOO

def load_existing_results(file_path):
    """
    Load already computed results from the CSV file.
    Returns a dictionary with tuples of the input array as keys and result as values.
    """
    if not os.path.exists(file_path):
        return {}

    results = {}
    with open(file_path, mode='r') as file:
        reader = csv.reader(file)
        
        for line_number, row in enumerate(reader, start=1):  # Start line numbers from 1
            if line_number == 1: # skip the header
                continue

            # Convert row to array and result
            input_array = tuple(map(float, row[:-len(FOM_store)]))  # All but the last three columns are the input
            result = [float(row[n_params+i]) for i in range(len(FOM_store))]  # Last column is the result
            results[input_array] = (result,line_number)
    return results


def check_for_duplicates(file_path):
    """
    Check the CSV file for duplicate inputs .
    """
    if not os.path.exists(file_path):
        print("CSV file does not exist.")
        return

    unique_entries = {}
    duplicates = False

    with open(file_path, mode='r') as file:
        reader = csv.reader(file)
        header_skipped = False

        for row in reader:
            if not header_skipped:
                header_skipped = True
                continue

            input_array = tuple(map(float, row[:-1]))
            result = float(row[-1])
            # Store only the first occurrence of each input
            if input_array in unique_entries:
                duplicates = True
                print("Duplicate: " + str(input_array))

    return duplicates

# Problem definition for PyMOO
class MyProblem(Problem):

    def __init__(self, **kwargs):
        super().__init__(n_var=n_params, n_obj=len(FOM_opt_id), n_ieq_constr=0, 
                         xl=[bound[0] for bound in param_bounds], 
                         xu=[bound[1] for bound in param_bounds], 
                         **kwargs)

    def _evaluate(self, X, out, *args, **kwargs):

        id_nxt = 0 # simultion progress index
        sim = Simulation(prj_dir, False, False)

        FOMs = np.zeros((X.shape[0],len(FOM_opt_id)))

        headers = ["param_" + str(i) for i in range(n_params)] + FOM_store
        
        # round X to the 10^-10 resolution
        X = np.round(X,10)

        # Check if the file exists and load the results
        file_exists = os.path.exists(csv_file)
        processed_results = load_existing_results(csv_file)

        # save the rng state before calling sim functions (they alter the state for some reason!)
        rng_state = np.random.get_state()

        # one iteration of this loop executes up to *parallel* number of simulations
        while id_nxt < X.shape[0]:

            all_found = True # if all simulations in this iteration have already been precomputed
            for i in range(id_nxt,min(X.shape[0],id_nxt+parallel)):
                params = X[i,:]
                
                input_tuple = tuple(params)  # Convert array to tuple for comparison
                if input_tuple not in processed_results:
                    all_found = False

            # load precomputed results instead of running Lumerical
            if all_found:
                for i in range(id_nxt,min(X.shape[0],id_nxt+parallel)):
                    params = X[i,:]
                    input_tuple = tuple(params)  # Convert array to tuple for comparison
                    res, lineID = processed_results[input_tuple]
                    print(f'line loaded: {lineID}')
                    FOMs[i,:] = np.array([res[j] for j in FOM_opt_id])

            # run missing simulations
            else:
                # prepare the sim files
                for i in range(id_nxt,min(X.shape[0],id_nxt+parallel)):
                    params = X[i,:]

                    sim.load(fsp_prefix + str(i % parallel) + ".fsp")
                    sim.fdtd.switchtolayout()
                    script=('selectpartial("polygon");' +
                            'delete;')
                    sim.fdtd.eval(script)
                    geometry.add_geo(sim, params, False)
                    sim.save(fsp_prefix + str(i % parallel) + ".fsp")


                fsp_files = [fsp_prefix + str(n) + ".fsp" for n in range(parallel)]
                for fsp_file in fsp_files:
                    sim.fdtd.addjob(fsp_file)
        
                # Run the job queue
                sim.fdtd.runjobs()

                # read the output from simulations
                for i in range(id_nxt,min(X.shape[0],id_nxt+parallel)):
                    params = X[i,:]
                    
                    # Read the simulation results only if they are not already stored
                    input_tuple = tuple(params)  # Convert array to tuple for comparison
                    if input_tuple in processed_results:
                        input_tuple = tuple(params)  # Convert array to tuple for comparison
                        res, lineID = processed_results[input_tuple]
                        print(f'line loaded: {lineID}')
                        FOMs[i,:] = np.array([res[j] for j in FOM_opt_id])
                        continue

                    sim.load(fsp_prefix + str(i % parallel) + ".fsp")

                    FOMs_all = []
                    for TE_name in FOM_store:
                        exp_TE = sim.fdtd.getresult("mode_expansion_" + TE_name, "expansion for input")
                        data_TE = np.mean(exp_TE["T_net"])

                        # round to 10^-4 resolution
                        data_TE = np.round(data_TE,4)
                        FOMs_all.append(data_TE)    

                    FOMs[i,:] = np.array([FOMs_all[j] for j in FOM_opt_id])

                    with open(csv_file, mode='a', newline='') as file:
                        writer = csv.writer(file)
                        # Write headers only if the file doesn't exist
                        if not file_exists:
                            writer.writerow(headers)
                            file_exists = True
                        
                        new_row = X[i,:].tolist() + FOMs_all
                        writer.writerow(new_row)

            id_nxt = id_nxt + parallel

        # Dtore the function values and return them to the optimizer. The negative sign is due to minimization.
        out["F"] = -FOMs

        # reset the random number generator state back to before Lumerical was called
        np.random.set_state(rng_state)

problem = MyProblem()


algorithm = NSGA2(pop_size=n_Population)

res = minimize(problem,
               algorithm,
               ('n_gen', n_Generations),
               seed=seed,
               verbose=True)

# dboule check there are no duplicates in the stored results (just for testing)
print("Duplicates? " + str(check_for_duplicates(csv_file)))

In [None]:
### Save the output of the pareto frontier independently

frontier_file = f'frontier_p{n_Population}_g{n_Generations}_s{seed}_FOM[{[FOM_store[i] for i in FOM_opt_id]}].csv'
header = str(['param_' + str(i) for i in range(n_params)] + [FOM_store[i] for i in FOM_opt_id]).replace("'","").replace("[","").replace("]","")

np.savetxt(
    frontier_file,
    np.hstack((res.X,np.abs(res.F))),
    delimiter=',',
    fmt=('%.4e',)*n_params + ('%.4f',)*len(FOM_opt_id),  # Different formats for each column
    header=header,
    comments=''
)

In [None]:
### Adjoint with multiple objectives - function definition

# input: target_eff - target power distribution between modes, such as [0.1, 0.5, 0.4]
#        init_params - starting point for Lumopt
# 
# returns a tuple of : initial FOM, final FOM, final geometry parameters, # of iterations

import os 
LUMOPT_MAX_ITER = 5

def lumopt_mode_exchanger(target_eff,init_params):

    os.chdir(prj_dir)
    # Initialize FunctionDefinedPolygon class
    polygon = FunctionDefinedPolygon(func = mode_exchanger_geom,
                                    initial_params = init_params,
                                    # initial_params = [3.40E-07, 4.02E-07, 9.66E-07, 4.88E-07, 8.23E-07, 6.92E-07, 7.60E-07, 7.18E-07, 6.82E-07, 7.01E-07],
                                    bounds = param_bounds,
                                    z = 0,
                                    depth = 220e-9,
                                    eps_out = 1.44**2,
                                    eps_in = 2.8**2,
                                    edge_precision = 5,
                                    dx = 1e-9)


    ######## FIGURE OF MERIT ########
    fom1 = ModeMatch(monitor_name = 'fom_1',
                    mode_number = 2,
                    direction = 'Forward',
                    target_T_fwd = lambda wl: np.ones(wl.size) * target_eff[0],
                    norm_p = 1)

    fom2 = ModeMatch(monitor_name = 'fom_2',
                    mode_number = 6,
                    direction = 'Forward',
                    target_T_fwd = lambda wl: np.ones(wl.size) * target_eff[1],
                    norm_p = 1)
    
    fom3 = ModeMatch(monitor_name = 'fom_3',
                    mode_number = 10,
                    direction = 'Forward',
                    target_T_fwd = lambda wl: np.ones(wl.size) * target_eff[2],
                    norm_p = 1)

    ######## OPTIMIZATION ALGORITHM ########
    optimizer = ScipyOptimizers(max_iter = LUMOPT_MAX_ITER,
                                method = 'L-BFGS-B',
                                #scaling_factor = scaling_factor,
                                pgtol = 1.0e-5,
                                ftol = 1.0e-5,
                                #target_fom = 0.0,
                                scale_initial_gradient_to = 0.0)

    wavelengths = Wavelengths(start=1530e-9, stop=1570e-9, points=11)
    script = load_from_lsf(f'mode_exchanger_lumopt.lsf')

    opt1 = Optimization(base_script=script, wavelengths=wavelengths, fom=fom1, geometry=polygon, optimizer=optimizer,
                        use_deps=True, hide_fdtd_cad=False, plot_history=False, store_all_simulations=False,
                        save_global_index=False)
    opt2 = Optimization(base_script=script, wavelengths=wavelengths, fom=fom2, geometry=polygon, optimizer=optimizer,
                        use_deps=True, hide_fdtd_cad=False, plot_history=False, store_all_simulations=False,
                        save_global_index=False)
    opt3 = Optimization(base_script=script, wavelengths=wavelengths, fom=fom3, geometry=polygon, optimizer=optimizer,
                        use_deps=True, hide_fdtd_cad=False, plot_history=False, store_all_simulations=False,
                        save_global_index=False)

    ######## PUT EVERYTHING TOGETHER AND RUN ########
    opt = SuperOptimization((opt1, opt2, opt3))

    ######## RUN THE OPTIMIZATION ########
    results = opt.run(working_dir=os.path.join(prj_dir,'optimizations','opts'))
    
    # close all sessions
    for opt_i in opt.optimizations:
        opt_i.sim.fdtd.close()

    return((np.abs(opt.fom_hist[0]),) + results + (len(opt.fom_hist),))

In [None]:
### Testing adjoint
target_eff = [0.3,0.3,0.4]
init_params = global_sample_function().flatten()
fom_start, fom_end, params_out, iter = lumopt_mode_exchanger(target_eff,init_params)
print(f'initial FOM = {fom_start}, final FOM = {fom_end}, # of iterations = {iter}')