In [1]:
# This file is part of the ADS Parameter Fitting project.
# must be used under ADS integrated Python env (A)
# namely, ..\ADS_install_path\tools\python\python.exe --> Python 3.13.2
# TODO: whole script is run in Jupyter because ADS python ADI only
# supports IPython kernel !!!

# packages to build DIR env
import os, json
# set ads dict: HPEESOF_DIR and home director : HOME
os.environ['HPEESOF_DIR'] = 'D:/ADS/install'
os.environ['HOME'] = 'D:/ADS/dir'

# packages to import ADS
from keysight.ads import de
from keysight.ads.de import db_uu as db
from keysight.edatoolbox import ads
import keysight.ads.dataset as dataset
from keysight.edatoolbox import util
from pathlib import Path
from IPython.core import getipython

# packages to import data analysis and save
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time
import h5py

# packages to multiprocessing
# add current working directory
import sys
cur_path = "E:/personal_Data/Document of School/Uni Stuttgart/Masterarbeit/Code/param_regression/ADS_Parameter_Fitting/IV_param_regression/data_gen"
sys.path.append(cur_path)

# more meaningful data generator based on log-uniform distribution
from log_data_gen import param_random_generator as param_random_generator_log

In [2]:
# necessary class and functions

class PyADS():
    def __init__(self):
        self.HPEESOF_DIR = 'D:/ADS/install'
        self.HOME = 'D:/ADS/dir'
        self.cur_workspace_path = None
        self.workspace = None
        self.cur_library_name = None
        self.library = None
        self.cur_design_name = None
        self.design = None

    def create_and_open_an_empty_workspace(self, workspace_path: str):
    # example : workspace_path = "C:/ADS_Python_Tutorials/tutorial1_wrk"
    # Ensure there isn't already a workspace open
        if de.workspace_is_open():
            de.close_workspace()
    
        # Cannot create a workspace if the directory already exists
        if os.path.exists(workspace_path):
            raise RuntimeError(f"Workspace directory already exists: {workspace_path}")
    
        # Create the workspace
        workspace = de.create_workspace(workspace_path)
        # Open the workspace
        workspace.open()
        # Return the open workspace and close when it finished
        return workspace
    
    def create_a_library_and_add_it_to_the_workspace(self, workspace: de.Workspace, library_name: str) -> None:
        #assert workspace.path is not None
        # Libraries can only be added to an open workspace
        assert workspace.is_open
        # We'll create a library in the directory of the workspace
        library_path = workspace.path / library_name
        # Create the library
        de.create_new_library(library_name, library_path)
        # And add it to the workspace (update lib.defs)
        workspace.add_library(library_name, library_path, de.LibraryMode.SHARED)
        lib=workspace.open_library(library_name,library_path,de.LibraryMode.SHARED)
        return lib

    def schematic_simulation(self, workspace_path: str, library_name: str, design_name: str, instance_name: str, var_dict: dict, vgs_bias_param_sweep_name: str, vds_bias_param_sweep_name: str, vgs_bias_simulation_name: str, vds_bias_simulation_name: str) -> None:
        ''' Load Path and files, Edit the design variables, Simulate the design, and return the dataset '''

        # >> Load Path and files
        if not os.path.exists(workspace_path):
            raise RuntimeError(f"Workspace directory doesn't exist: {workspace_path}")
        if de.workspace_is_open():
            de.close_workspace()
        
        # Open the workspace
        # if (not self.workspace) or (self.cur_workspace_path != workspace_path):
        self.workspace = de.open_workspace(workspace_path)
        self.cur_workspace_path = workspace_path
        # Open the library
        # if (not self.library) or (self.cur_library_name != library_name):
        self.library = self.workspace.open_library(lib_name=library_name, mode=de.LibraryMode.SHARED)
        self.cur_library_name = library_name
        # Open the design
        # if (not self.design) or (self.cur_design_name != design_name):
        self.design = db.open_design((library_name, design_name, "schematic"), db.DesignMode.APPEND)
        self.cur_design_name = design_name

        # >> Edit the design variables
        # edit VAR
        v = self.design.get_instance(inst_name=instance_name)
        assert v.is_var_instance
        for var_name in var_dict:
            v.vars[var_name] = var_dict[var_name]
        # Save the design
        self.design.save_design()
        # Simulate the design
        output_dir = os.path.join(self.workspace.path, "output")
        netlist_file = os.path.join(output_dir, "data_gen.ckt")
        output_file =  os.path.join(output_dir, "data_gen.ckt.out")
        # create the simulation output directory
        util.safe_makedirs(output_dir)

        # >> Simulate and return the dataset
        ipython = getipython.get_ipython()
        if ipython is None:
            print("The remaining portion of the script must be run in an IPython environment. Exiting.")
            return
        # capture the netlist in a string
        netlist = self.design.generate_netlist()
        # access to the simulator object to run netlists
        simulator = ads.CircuitSimulator()
        # run the netlist, this will block output
        simulator.run_netlist(netlist, output_dir=output_dir, netlist_file=netlist_file, output_file=output_file)
        output_data = dataset.open(Path(os.path.join(output_dir, f"{design_name}.ds")))
        
        # >> return data in pandas DataFrame format
        # <class 'pandas.core.frame.DataFrame'>
        data_ids_vds = output_data[f'{vgs_bias_param_sweep_name}.{vgs_bias_simulation_name}.DC'].to_dataframe().reset_index()
        data_ids_vgs = output_data[f'aele_0.{vds_bias_param_sweep_name}.{vds_bias_simulation_name}'].to_dataframe().reset_index()
        return data_ids_vds, data_ids_vgs
    


    def dataset_reshape(self, pd_data_IV: pd.DataFrame, pd_data_gm: pd.DataFrame, IV_dimension: list, gm_dimension: list, var_dict: dict):
        ''' reshape the dataset into desired input matrix and output vector '''
        IV_row_count = IV_dimension[0] # Vgs
        IV_col_count = IV_dimension[1] # Vds
        gm_row_count = gm_dimension[0] # Vds
        gm_col_count = gm_dimension[1] # Vgs

        output_x_IV = np.empty((IV_row_count, IV_col_count),dtype=np.float64)
        output_x_gm = np.empty((gm_row_count, gm_col_count),dtype=np.float64)
        output_y = np.empty((len(var_dict), 1),dtype=np.float64)

        for row in range(IV_row_count):
            output_x_IV[row, :] = pd_data_IV.loc[pd_data_IV['VGS'] == (row + 1), 'IDS.i'].to_numpy()
        for col in range(gm_col_count):
            output_x_gm[:, col] = pd_data_gm.loc[pd_data_gm['VGS'] == (col + 1.5), 'gm'].to_numpy()
        for index, item in enumerate(var_dict):
            output_y[index, 0] = var_dict[item]

        return output_x_IV, output_x_gm, output_y
    

def param_random_generator(param_range: dict):
    ''' generate a random parameter set for the HEMT model '''
    # define the parameter range
    # param_range = {
    #     'VOFF': (-1.2, 2.6),
    #     'U0': (0, 2.2),
    #     'NS0ACCS': (1e15, 1e20),
    #     'NFACTOR': (0.1, 5),
    #     'ETA0': (0, 1),
    #     'VSAT': (5e4, 1e7),
    #     'VDSCALE': (0.5, 1e6),
    #     'CDSCD': (1e-5, 0.75),
    #     'LAMBDA': (0, 0.2),
    #     'MEXPACCD': (0.05, 12),
    #     'DELTA': (2, 100)
    # }
    # generate random parameters
    var_dict = {key: str(np.random.uniform(low=val[0], high=val[1])) for key, val in param_range.items()}
    return var_dict

def init_h5_file(h5_path, x_iv_shape, x_gm_shape, y_shape,
                 dtype_x=np.float64, dtype_y=np.float64):
    with h5py.File(h5_path, 'w') as f:
        # X: [num_samples, m, n]
        f.create_dataset(
            'X_iv',
            shape=(0, x_iv_shape[0], x_iv_shape[1]),
            maxshape=(None, x_iv_shape[0], x_iv_shape[1]),
            dtype=dtype_x
        )
        f.create_dataset(
            'X_gm',
            shape=(0, x_gm_shape[0], x_gm_shape[1]),
            maxshape=(None, x_gm_shape[0], x_gm_shape[1]),
            dtype=dtype_x
        )
        # Y: [num_samples, y_len]
        f.create_dataset(
            'Y',
            shape=(0, y_shape[0], 1),
            maxshape=(None, y_shape[0], 1),
            dtype=dtype_y
        )

def append_to_h5(h5_path, x_iv_new, x_gm_new, y_new):
    x_iv_new = np.asarray(x_iv_new, dtype=np.float64)
    x_gm_new = np.asarray(x_gm_new, dtype=np.float64)
    y_new = np.asarray(y_new, dtype=np.float64)

    # 确保 y_new 是二维 (batch_size, y_len)
    if y_new.ndim == 2:
        y_new = y_new.reshape(-1, 1)
    else:
        raise RuntimeError(f"y_new must be vector, but got shape {y_new.shape}")

    with h5py.File(h5_path, 'a') as f:
        ds_x_iv = f['X_iv']
        ds_x_gm = f['X_gm']
        ds_y = f['Y']

        cur_len = ds_x_iv.shape[0]
        new_len = cur_len + 1

        # 扩展
        ds_x_iv.resize(new_len, axis=0)
        ds_x_gm.resize(new_len, axis=0)
        ds_y.resize(new_len, axis=0)

        # 赋值
        ds_x_iv[cur_len:new_len, :, :] = x_iv_new
        ds_x_gm[cur_len:new_len, :, :] = x_gm_new
        ds_y[cur_len:new_len, :] = y_new


def singel_process_iteration_data_gen2h5(workspace_path: str, validate_dict: dict, library_name: str, design_name: str, instance_name: str, param_range: dict, vgs_bias_param_sweep_name: str, vds_bias_param_sweep_name: str, vgs_bias_simulation_name: str, vds_bias_simulation_name: str, iteration_num: int, process_id: int, save_path: str, dtype_x=np.float64, dtype_y=np.float64):
    ''' generate dataset in single process iteration '''
    # create an instance of the PyADS class
    ads_ctrl = PyADS()
    X_iv_shape = [7,121]
    X_gm_shape = [121,6]
    # X_iv_shape = [7,186]
    # X_gm_shape = [186,6]
    Y_shape = [11,1]

    # init h5 file
    init_h5_file(f"{save_path}\\dataset_process_{process_id}.h5", X_iv_shape, X_gm_shape, Y_shape)

    for i in range(iteration_num):
        start_time = time.time()
        if validate_dict:
            var_dict = validate_dict
        else:
            # var_dict = param_random_generator(param_range)
            var_dict = param_random_generator_log(param_range) # use log-uniform distribution based generator
        pd_data_vgs_bias, pd_data_gm = ads_ctrl.schematic_simulation(
            workspace_path,
            library_name,
            design_name,
            instance_name,
            var_dict,
            vgs_bias_param_sweep_name,
            vds_bias_param_sweep_name,
            vgs_bias_simulation_name,
            vds_bias_simulation_name
        )
        X_iv, X_gm, y = ads_ctrl.dataset_reshape(pd_data_vgs_bias, pd_data_gm, X_iv_shape, X_gm_shape, var_dict)
        end_time = time.time()
        print(f' >> Process {process_id} :: Loop {i + 1}/{iteration_num} :: used time:', round(end_time - start_time, 2), 's')

        try:
            append_to_h5(f"{save_path}\\dataset_process_{process_id}.h5", X_iv, X_gm, y)
        except:
            print(f"【ERROR】Error appending data in process {process_id} at iteration {i + 1}.")
            continue

In [None]:
# DEFINE VARIABLES

workspace_path = "E:\\personal_Data\\Document of School\\Uni Stuttgart\\Masterarbeit\\Simulation\\ADS\\ASM_HEMT1_wrk_1_Jia"
validate_dict = None
library_name = "IAF_pGaN_lib"
design_name = "gs66508bv1_Pytest_simple_paramset"
instance_name = "IV"
var_dict = {'VOFF':'1.785', 'U0':'0.424', 'NS0ACCS':'2e+17', 'NFACTOR':'1', 'ETA0':'0.06', 'VSAT':'8e+4', 'VDSCALE':'5', 'CDSCD':'0.1', 'LAMBDA':'0.01', 'MEXPACCD':'1.5', 'DELTA':'3'}
param_range = {
        'VOFF': (-1.2, 2.6),
        'U0': (0, 2.2),
        'NS0ACCS': (1e15, 1e20),
        'NFACTOR': (0.1, 5),
        'ETA0': (0, 1),
        'VSAT': (5e4, 1e7),
        'VDSCALE': (0.5, 1e6),
        'CDSCD': (1e-5, 0.75),
        'LAMBDA': (0, 0.2),
        'MEXPACCD': (0.05, 12),
        'DELTA': (2, 100)
    }
vgs_bias_param_sweep_name = 'Sweep_vgs'
vds_bias_param_sweep_name = 'Sweep_vds'
vgs_bias_simulation_name = 'DC1'
vds_bias_simulation_name = 'DC2'
iteration_num = 1000
process_id = 1
data_save_path = "C:\\Users\\97427\\Desktop\\testttttttttttttt"

# for multiprocessing
process_count = 1

In [None]:
# main function to run the data generation - single process

singel_process_iteration_data_gen2h5(
    workspace_path, 
    validate_dict,
    library_name, 
    design_name, 
    instance_name,
    param_range, 
    vgs_bias_param_sweep_name,
    vds_bias_param_sweep_name,
    vgs_bias_simulation_name,
    vds_bias_simulation_name,
    iteration_num,
    process_id, 
    data_save_path
)


In [None]:
# main function to run the data generation - single process 
# 1000 samples

total_iteration_num = 10

for iter in range(total_iteration_num):
    process_id = iter + 1
    singel_process_iteration_data_gen2h5(
    workspace_path, 
    validate_dict,
    library_name, 
    design_name, 
    instance_name,
    param_range, 
    vgs_bias_param_sweep_name,
    vds_bias_param_sweep_name,
    vgs_bias_simulation_name,
    vds_bias_simulation_name,
    iteration_num,
    process_id, 
    data_save_path
    )


In [None]:
# main function to run the data generation - multi-processing

# create parameter vector for multiple processes
param_vector = [(workspace_path, 
                validate_dict,
                library_name, 
                design_name, 
                instance_name,
                param_range, 
                vgs_bias_param_sweep_name,
                vds_bias_param_sweep_name,
                vgs_bias_simulation_name,
                vds_bias_simulation_name,
                iteration_num,
                process_id, 
                data_save_path) 
                for process_id in range(1, process_count + 1)]

# run the data generation in multiple processes
# try:
Parallel(n_jobs=4, backend="loky")(delayed(singel_process_iteration_data_gen2h5)(*p) for p in param_vector)
# except:
#     raise ConnectionError(f"Error in parallel processing. Please check the parameters and the ADS environment.")



In [None]:
# TODO: This part is only for testing, to verify the content of saved data

# load data
# with h5py.File(f"{data_save_path}\\dataset_process_x.h5", 'r') as f:
with h5py.File(f"{data_save_path}\\dataset_process_{9999}.h5", 'r') as f:
    print('data structure: ', f.keys())
    print('X_iv shape: ', f['X_iv'].shape)
    print('X_gm shape: ', f['X_gm'].shape)
    print('Y shape: ', f['Y'].shape)


    test_index = 1
    test_index -= 1

    legend_values = np.linspace(-3.5, 15, f['X_gm'].shape[1])  
    # create colormap
    cmap = plt.cm.gist_ncar
    norm = plt.Normalize(vmin=legend_values.min(), vmax=legend_values.max())

    fig, ax = plt.subplots(1, 2, figsize=(15, 8)) 
    for i in range(f['X_iv'].shape[1]):
        ax[0].plot(np.linspace(-3.5, 15, f['X_gm'].shape[1]) , f['X_iv'][test_index,i,:].flatten() , label=f"VGS={i+1} V")
        ax[0].legend()
        ax[0].grid()
        ax[0].set_title('test plot of I-V')
        ax[0].set_xlabel("VDS (V)")
        ax[0].set_ylabel("IDS (A)")
    for i in range(f['X_gm'].shape[1]):
        color = cmap(norm(legend_values[i]))
        ax[1].plot(np.arange(1.5, 7.5, 1.0), f['X_gm'][test_index,i,:], color=color)
        ax[1].grid(True)
        ax[1].set_title('test plot of gm')
        ax[1].set_xlabel("VGS (V)")
        ax[1].set_ylabel("gm")
    # 加颜色条
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])  # 仅用于colorbar
    cbar = fig.colorbar(sm, ax=ax)
    cbar.set_label("VDS (V)")
    

In [None]:
# testing: verify gm from both Ids-Vds and Ids-Vgs

process_id = 1
test_index = 1

with h5py.File(f"{data_save_path}\\dataset_process_{process_id}.h5", 'r') as f:

    gm_test = np.empty((121,6), dtype=np.float64)
    for i in range(121):
        for j in range(6):
            gm_test[i,j] = f['X_iv'][test_index, j+1, i] - f['X_iv'][test_index, j, i]
        
    legend_values = np.linspace(-3.5, 20, f['X_gm'].shape[1])  
    # create colormap
    cmap = plt.cm.gist_ncar
    norm = plt.Normalize(vmin=legend_values.min(), vmax=legend_values.max())

    fig, ax = plt.subplots(1, 3, figsize=(20, 8)) 
    for i in range(f['X_gm'].shape[1]):
        # plot 1
        color = cmap(norm(legend_values[i]))
        ax[0].plot(np.arange(1.5, 7.5, 1.0), gm_test[i,:], color=color)
        ax[0].grid(True)
        ax[0].set_title('gm from Ids-Vds')
        ax[0].set_xlabel("VGS (V)")
        ax[0].set_ylabel("gm")
        # plot 2
        color = cmap(norm(legend_values[i]))
        ax[1].plot(np.arange(1.5, 7.5, 1.0), f['X_gm'][test_index,i,:], color=color)
        ax[1].grid(True)
        ax[1].set_title('gm from Ids-Vgs')
        ax[1].set_xlabel("VGS (V)")
        ax[1].set_ylabel("gm")
        # plot 3
        ax[2].plot(np.arange(1.5, 7.5, 1.0), f['X_gm'][test_index,i,:] - gm_test[i,:], color=color)
        ax[2].grid(True)
        ax[2].set_title('error of both way to get gm')
        ax[2].set_xlabel("VGS (V)")
        ax[2].set_ylabel("error")

    # add colorbar
    sm = plt.cm.ScalarMappable(cmap=cmap, norm=norm)
    sm.set_array([])  # only for colorbar
    cbar = fig.colorbar(sm, ax=ax)
    cbar.set_label("VDS (V)")
    