In [None]:
from pathlib import Path
import json
from copy import deepcopy
import getpass
from typing import Dict, List
import os

os.environ['PAMILA_FACILITY']

In [None]:
import numpy as np
import yaml

class CustomDumper(yaml.SafeDumper):
    def represent_list(self, data):
        # Force lists to be represented in flow style (inline `[]` style)
        return self.represent_sequence("tag:yaml.org,2002:seq", data, flow_style=True)

# Add the custom list representation to the dumper
CustomDumper.add_representer(list, CustomDumper.represent_list)

In [None]:
# You can ignore about the `pydantic` deprecation warning (coming from `tiled`)
import pamila as pml
from pamila.sim_interface import PyATInterfaceSpec
from pamila.facility_configs.generator import (
    PamilaElementDefinition,
    ExtIntStrList,
    MachineModeSpecContainer,
    StandardReadbackDeviceDefinition as standard_RB,
    StandardSetpointDeviceDefinition as standard_SP,
    SetpointReadbackDiffDefinition as SP_RB_Diff_Def,
    ChannelSpec,
    ConversionFuncSpec,
)
from pamila.device.simple import FixedWaitTime
from pamila.device.specs import FunctionSpec
from pamila import Q_

In [None]:
assert pml.machine.get_facility_name() == os.environ['PAMILA_FACILITY']
print(pml.__version__)
print(pml.__file__)

In [None]:
machine_name = "SR" # := NSLS-II Storage Ring

sim_configs = {
    "facility": os.environ['PAMILA_FACILITY'],
    "machine": machine_name,
    "simulator_configs": {"no_simulator": None}
}

In [None]:
facility_folder = Path(sim_configs['facility'])
if not facility_folder.exists():
    facility_folder.mkdir()

In [None]:
machine_folder = facility_folder / machine_name
if not machine_folder.exists():
    machine_folder.mkdir()

machine_mlvs = pml.middle_layer.get_all_mlvs(machine_name)
machine_mlvs

In [None]:
import at

lattice_filepath = Path("lattice_files/nsls2_girders4d_pyATv0_6_1.mat")
lattice = at.load_lattice(lattice_filepath)
bpm_uint32_inds = at.get_uint32_index(lattice, "P[HLM]*")

sim_configs["simulator_configs"]["pyat"] = {
    "package_name": "pyat",
    "lattice_filepath": str(lattice_filepath.resolve()),
    "closed_orbit_uint32_indexes": bpm_uint32_inds.tolist(),
}
sim_configs["selected_config"] = "pyat"

In [None]:
output_filepath = machine_folder / "sim_configs.yaml"
with open(output_filepath, "w") as f:
    yaml.dump(
        sim_configs,
        f,
        sort_keys=False,
        default_flow_style=False,
        width=70,
        indent=2,
        Dumper=CustomDumper,
    )

print(f"{output_filepath = }\n")
print(output_filepath.read_text())

In [None]:
sel_config_folder = machine_folder / sim_configs["selected_config"]
sel_config_folder.mkdir(parents=True, exist_ok=True)

In [None]:
# Check simulation interface is working
sim_conf_d = sim_configs["simulator_configs"][sim_configs["selected_config"]]

match sim_conf_d["package_name"]:
    case "pyat":
        sim_itf_spec = PyATInterfaceSpec(**sim_conf_d)
    case _:
        raise NotImplementedError

sim_itf = pml.sim_interface.create_interface(sim_itf_spec, pml.MachineMode.SIMULATOR)
at = sim_itf.package
lattice = sim_itf.get_lattice()

In [None]:
USERNAME = getpass.getuser()

def _get_blank_elem_def(elem_defs, elem_name):
    if elem_name not in elem_defs:
        elem_defs[elem_name] = PamilaElementDefinition(name=elem_name)
    return elem_defs[elem_name]

pv_defs = []
simpv_defs = []
elem_defs = {}

# BPM definitions

In [None]:
def process_bpm_definition(
    simpv_defs: List,
    elem_defs: Dict,
    at_elem_name: str,
    pml_elem_name: str,
    pvname_d: Dict,
    sim_pvsuffix_d: Dict):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    matched_indexes = at.get_uint32_index(lattice, at_elem_name)
    assert len(matched_indexes) == 1
    lattice_index = int(
        matched_indexes[0]
    )  # Avoid numpy.uint32 that prevents saving into JSON/YAML
    elem = lattice[lattice_index]
    elem_type = elem.definition[0]

    info_to_check = dict(
        elem_name=at_elem_name, elem_type=elem_type, lattice_index=lattice_index
    )

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    pv_def_RB_list = []
    pv_def_SP_list = []

    for plane, pvname in pvname_d.items():
        pv_def_RB = deepcopy(pv_def)
        pv_def_SP = None

        pv_def_RB["handle"] = "RB"

        pv_def_RB["ext"]["pvname"] = dict(
            LIVE=pvname,
            DT=f"{USERNAME}:{pvname}" # PV name for DT (Digital Twin)
        )

        pv_def_RB["ext"]["pvunit"] = dict(LIVE="mm", DT="mm")

        ext_pvid = f"extpv_{plane}_RB"
        int_pvid = f"intpv_{plane}"

        pv_def_RB["ext"]["pvid_in_elem"] = ext_pvid

        sim_pvsuffix = sim_pvsuffix_d[plane]
        # sim_pvname = f"SIMPV:{sim_pvsuffix}"

        pv_def_RB["int"]["pvsuffix"] = sim_pvsuffix
        pv_def_RB["int"]["pvunit"] = "m"
        pv_def_RB["int"]["pvid_in_elem"] = int_pvid
        pv_def_RB["int"]["info_to_check"] = info_to_check

        simpv_defs.append(
            dict(pvclass="BPMSimPV", pvsuffix=sim_pvsuffix, args=[lattice_index, plane])
        )

        elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

        repr_str = plane

        elem_def.repr_units[repr_str] = "mm"
        elem_def.pvid_to_repr_map.ext[ext_pvid] = repr_str
        elem_def.pvid_to_repr_map.int[int_pvid] = repr_str

        elem_def.channel_map[f"{plane}_RB"] = ChannelSpec(
            handle="RB",
            reprs=[repr_str],
            pvs=ExtIntStrList(ext=[ext_pvid], int=[int_pvid]),
            pdev_def=pdev_standard_RB_def,
        )

        pv_def_RB_list.append(pv_def_RB)
        if pv_def_SP is not None:
            pv_def_SP_list.append(pv_def_SP)

    return pv_def_RB_list, pv_def_SP_list

In [None]:
bpm_info_list = [
    dict(at_elem_name = "PH1G2C30A",
         pml_elem_name = "C30_P1",
         pvname_d = {"x": 'SR:C30-BI{BPM:1}Pos:X-I',
                     "y": 'SR:C30-BI{BPM:1}Pos:Y-I'},
         sim_pvsuffix_d = {"x": 'C30_P1_X',
                           "y": 'C30_P1_Y'}
    ),
    dict(at_elem_name = "PH2G2C30A",
         pml_elem_name = "C30_P2",
         pvname_d = {"x": 'SR:C30-BI{BPM:2}Pos:X-I',
                     "y": 'SR:C30-BI{BPM:2}Pos:Y-I'},
         sim_pvsuffix_d = {"x": 'C30_P2_X',
                           "y": 'C30_P2_Y'}
    ),
    dict(at_elem_name = "PM1G4C30A",
         pml_elem_name = "C30_P3",
         pvname_d = {"x": 'SR:C30-BI{BPM:3}Pos:X-I',
                     "y": 'SR:C30-BI{BPM:3}Pos:Y-I'},
         sim_pvsuffix_d = {"x": 'C30_P3_X',
                           "y": 'C30_P3_Y'}
    ),
    dict(at_elem_name = "PM1G4C30B",
         pml_elem_name = "C30_P4",
         pvname_d = {"x": 'SR:C30-BI{BPM:4}Pos:X-I',
                     "y": 'SR:C30-BI{BPM:4}Pos:Y-I'},
         sim_pvsuffix_d = {"x": 'C30_P4_X',
                           "y": 'C30_P4_Y'}
    ),
    dict(at_elem_name = "PL2G6C30B",
         pml_elem_name = "C30_P5",
         pvname_d = {"x": 'SR:C30-BI{BPM:5}Pos:X-I',
                     "y": 'SR:C30-BI{BPM:5}Pos:Y-I'},
         sim_pvsuffix_d = {"x": 'C30_P5_X',
                           "y": 'C30_P5_Y'}
    ),
    dict(at_elem_name = "PL1G6C30B",
         pml_elem_name = "C30_P6",
         pvname_d = {"x": 'SR:C30-BI{BPM:6}Pos:X-I',
                     "y": 'SR:C30-BI{BPM:6}Pos:Y-I'},
         sim_pvsuffix_d = {"x": 'C30_P6_X',
                           "y": 'C30_P6_Y'}
    ),
]

In [None]:
for d in bpm_info_list:
    pv_def_RB_list, pv_def_SP_list = process_bpm_definition(
        simpv_defs,
        elem_defs,
        d["at_elem_name"],
        d["pml_elem_name"],
        d["pvname_d"],
        d["sim_pvsuffix_d"])

    pv_defs.extend(pv_def_RB_list + pv_def_SP_list)

# Orbit Corrector (Steerer) Definitions

In [None]:
def process_corrector_definition(
    simpv_defs: List,
    elem_defs: Dict,
    at_elem_name: str,
    pml_elem_name: str,
    get_pvname_d: Dict,
    put_pvname_d: Dict,
    sim_pvsuffix_d: Dict,
    conv_func_specs: List[Dict]):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    matched_indexes = at.get_uint32_index(lattice, at_elem_name)
    assert len(matched_indexes) == 1
    lattice_index = int(
        matched_indexes[0]
    )  # Avoid numpy.uint32 that prevents saving into JSON/YAML
    elem = lattice[lattice_index]
    elem_type = elem.definition[0]

    info_to_check = dict(
        elem_name=at_elem_name, elem_type=elem_type, lattice_index=lattice_index
    )

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    pv_def_SP_list = []
    pv_def_RB_list = []

    for plane, get_pvname in get_pvname_d.items():
        put_pvname = put_pvname_d[plane]

        sim_pvsuffix = sim_pvsuffix_d[plane]
        # sim_pvname = f"SIMPV:{sim_pvsuffix}"

        ext_SP_pvid = f"extpv_{plane}_I_SP"
        ext_RB_pvid = f"extpv_{plane}_I_RB"
        int_pvid = f"intpv_{plane}_angle"

        pv_def_SP = deepcopy(pv_def)
        pv_def_SP["handle"] = "SP"
        pv_def_SP["ext"]["pvname"] = dict(
            LIVE=put_pvname,
            DT=f"{USERNAME}:{put_pvname}" # PV name for DT (Digital Twin)
        )
        pv_def_SP["ext"]["pvunit"] = dict(LIVE="A", DT="A")
        pv_def_SP["ext"]["pvid_in_elem"] = ext_SP_pvid
        pv_def_SP["int"]["pvsuffix"] = sim_pvsuffix
        pv_def_SP["int"]["pvunit"] = "rad"
        pv_def_SP["int"]["pvid_in_elem"] = int_pvid
        pv_def_SP["int"]["info_to_check"] = info_to_check

        pv_def_RB = deepcopy(pv_def)
        pv_def_RB["handle"] = "RB"
        pv_def_RB["ext"]["pvname"] = dict(
            LIVE=get_pvname,
            DT=f"{USERNAME}:{get_pvname}" # PV name for DT (Digital Twin)
        )
        pv_def_RB["ext"]["pvunit"] = dict(LIVE="A", DT="A")
        pv_def_RB["ext"]["pvid_in_elem"] = ext_RB_pvid
        pv_def_RB["int"]["same_as_SP"] = True
        pv_def_RB["int"]["pvid_in_elem"] = pv_def_SP["int"]["pvid_in_elem"]

        simpv_defs.append(
            dict(pvclass="CorrectorSimPV",
                 pvsuffix=sim_pvsuffix,
                 args=[lattice_index, plane])
        )

        elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

        repr_I = f"{plane}_I"
        repr_angle = f"{plane}_angle"

        elem_def.repr_units[repr_I] = "A"
        elem_def.pvid_to_repr_map.ext[ext_SP_pvid] = repr_I
        elem_def.pvid_to_repr_map.ext[ext_RB_pvid] = repr_I

        elem_def.repr_units[repr_angle] = "mrad"
        elem_def.pvid_to_repr_map.int[int_pvid] = repr_angle

        elem_def.channel_map[f"{repr_I}_RB"] = ChannelSpec(
            handle="RB",
            reprs=[repr_I],
            pvs=ExtIntStrList(ext=[ext_RB_pvid], int=[int_pvid]),
            pdev_def=pdev_standard_RB_def,
        )
        elem_def.channel_map[f"{repr_angle}_RB"] = ChannelSpec(
            handle="RB",
            reprs=[repr_angle],
            pvs=ExtIntStrList(ext=[ext_RB_pvid], int=[int_pvid]),
            pdev_def=pdev_standard_RB_def,
        )

        sp_rb_diff = SP_RB_Diff_Def(
            RB_channel=f"{repr_I}_RB",
            abs_tol=Q_("0.01 A"),
            rel_tol=None,
            timeout=Q_("10 s"),
            settle_time=Q_("2 s"),
            poll_time=Q_("0.5 s"),
        )
        pdev_SP_def = MachineModeSpecContainer(
            LIVE=standard_SP(
                set_wait_method="SP_RB_diff",
                SP_RB_diff=sp_rb_diff,
                fixed_wait_time=FixedWaitTime(dt=Q_("1.0 s")),
            ),
            DT=standard_SP(
                set_wait_method="fixed_wait_time",
                SP_RB_diff=sp_rb_diff,
                fixed_wait_time=FixedWaitTime(dt=Q_("1.0 s")),
            ),
            SIM=standard_SP(),
        )
        elem_def.channel_map[f"{repr_I}_SP"] = ChannelSpec(
            handle="SP",
            reprs=[repr_I],
            pvs=ExtIntStrList(ext=[ext_SP_pvid], int=[int_pvid]),
            pdev_def=pdev_SP_def,
        )

        sp_rb_diff = SP_RB_Diff_Def(
            RB_channel=f"{repr_angle}_RB",
            abs_tol=Q_("2 urad"),
            rel_tol=None,
            timeout=Q_("10 s"),
            settle_time=Q_("2 s"),
            poll_time=Q_("0.5 s"),
        )
        pdev_SP_def = MachineModeSpecContainer(
            LIVE=standard_SP(
                set_wait_method="SP_RB_diff",
                SP_RB_diff=sp_rb_diff,
                fixed_wait_time=FixedWaitTime(dt=Q_("1.0 s")),
            ),
            DT=standard_SP(
                set_wait_method="fixed_wait_time",
                SP_RB_diff=sp_rb_diff,
                fixed_wait_time=FixedWaitTime(dt=Q_("1.0 s")),
            ),
            SIM=standard_SP(),
        )
        elem_def.channel_map[f"{repr_angle}_SP"] = ChannelSpec(
            handle="SP",
            reprs=[repr_angle],
            pvs=ExtIntStrList(ext=[ext_SP_pvid], int=[int_pvid]),
            pdev_def=pdev_SP_def,
        )

        for in_reprs, out_reprs in [
            ([repr_I], [repr_angle]),
            ([repr_angle], [repr_I]),
            ]:

            func_spec = None
            for spec in conv_func_specs:
                if (spec['in_reprs'] == in_reprs) and (spec['out_reprs'] == out_reprs):
                    func_spec = FunctionSpec(**spec['func_spec'])
                    break
            assert func_spec is not None

            elem_def.func_specs.append(
                ConversionFuncSpec(
                    in_reprs=in_reprs, out_reprs=out_reprs, func_spec=func_spec
                )
            )

        pv_def_RB_list.append(pv_def_RB)
        if pv_def_SP is not None:
            pv_def_SP_list.append(pv_def_SP)

    return pv_def_RB_list, pv_def_SP_list

In [None]:
cor_info_list = [
    dict(at_elem_name = "CH1XG2C30A",
         pml_elem_name = "C30_C1",
         get_pvname_d = {"x": 'SR:C30-MG{PS:CH1A}I:Ps1DCCT1-I'},
         put_pvname_d = {"x": 'SR:C30-MG{PS:CH1A}I:Sp1-SP'},
         sim_pvsuffix_d = {"x": 'C30_C1_X'},
         conv_func_specs=[
            {"in_reprs": ["x_I"],
             "out_reprs": ["x_angle"],
             "func_spec": {"name": "poly1d", "args": [[-0.05528125727506806, 0.0]]}},
            {"in_reprs": ["x_angle"],
             "out_reprs": ["x_I"],
             "func_spec": {"name": "poly1d", "args": [[-18.089313617166983, 0.0]]}}
            ]
    ),
    dict(at_elem_name = "CH1YG2C30A",
         pml_elem_name = "C30_C1",
         get_pvname_d = {"y": 'SR:C30-MG{PS:CH1A}I:Ps2DCCT1-I'},
         put_pvname_d = {"y": 'SR:C30-MG{PS:CH1A}I:Sp2-SP'},
         sim_pvsuffix_d = {"y": 'C30_C1_Y'},
         conv_func_specs=[
            {"in_reprs": ["y_I"],
             "out_reprs": ["y_angle"],
             "func_spec": {"name": "poly1d", "args": [[-0.045499498547631176, 0.0]]}},
            {"in_reprs": ["y_angle"],
             "out_reprs": ["y_I"],
             "func_spec": {"name": "poly1d", "args": [[-21.978264198959234, 0.0]]}}
            ]
    ),
    dict(at_elem_name = "CH2XG2C30A",
         pml_elem_name = "C30_C2",
         get_pvname_d = {"x": 'SR:C30-MG{PS:CH2A}I:Ps1DCCT1-I'},
         put_pvname_d = {"x": 'SR:C30-MG{PS:CH2A}I:Sp1-SP'},
         sim_pvsuffix_d = {"x": 'C30_C2_X'},
         conv_func_specs=[
            {"in_reprs": ["x_I"],
             "out_reprs": ["x_angle"],
             "func_spec": {"name": "poly1d", "args": [[-0.054294962028959434, 0.0]]}},
            {"in_reprs": ["x_angle"],
             "out_reprs": ["x_I"],
             "func_spec": {"name": "poly1d", "args": [[-18.417915081451344, 0.0]]}},
            ]
    ),
    dict(at_elem_name = "CH2YG2C30A",
         pml_elem_name = "C30_C2",
         get_pvname_d = {"y": 'SR:C30-MG{PS:CH2A}I:Ps2DCCT1-I'},
         put_pvname_d = {"y": 'SR:C30-MG{PS:CH2A}I:Sp2-SP'},
         sim_pvsuffix_d = {"y": 'C30_C2_Y'},
         conv_func_specs=[
            {"in_reprs": ["y_I"],
             "out_reprs": ["y_angle"],
             "func_spec": {"name": "poly1d", "args": [[-0.05076525187516862, 0.0]]}},
            {"in_reprs": ["y_angle"],
             "out_reprs": ["y_I"],
             "func_spec": {"name": "poly1d", "args": [[-19.698513511939083, 0.0]]}}
            ]
    ),
]

In [None]:
for d in cor_info_list:
    pv_def_RB_list, pv_def_SP_list = process_corrector_definition(
        simpv_defs,
        elem_defs,
        d["at_elem_name"],
        d["pml_elem_name"],
        d["get_pvname_d"],
        d["put_pvname_d"],
        d["sim_pvsuffix_d"],
        d["conv_func_specs"])

    pv_defs.extend(pv_def_RB_list + pv_def_SP_list)


# Quad definitions

In [None]:
def process_quad_definition(
    simpv_defs: List,
    elem_defs: Dict,
    at_elem_name: str,
    pml_elem_name: str,
    get_pvname: str,
    put_pvname: str,
    sim_pvsuffix: str,
    conv_func_specs: List[Dict]):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    matched_indexes = at.get_uint32_index(lattice, at_elem_name)
    assert len(matched_indexes) == 1
    lattice_index = int(
        matched_indexes[0]
    )  # Avoid numpy.uint32 that prevents saving into JSON/YAML
    elem = lattice[lattice_index]
    elem_type = elem.definition[0]

    info_to_check = dict(
        elem_name=at_elem_name, elem_type=elem_type, lattice_index=lattice_index
    )

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    SP_RB_diffs = {
        repr: SP_RB_Diff_Def(
            RB_channel=f"{repr}_RB",
            abs_tol=abs_tol,
            rel_tol=None,
            timeout=Q_("10 s"),
            settle_time=Q_("2 s"),
            poll_time=Q_("0.5 s"),
        )
        for repr, abs_tol in [
            ("I", Q_("0.01 A")),
            ("K1L", Q_("5e-4 m^{-1}")),
            ("K1", Q_("1e-4 m^{-2}")),
        ]
    }
    pdev_SP_def_d = {
        repr: MachineModeSpecContainer(
        LIVE=standard_SP(
            set_wait_method="fixed_wait_time",
            SP_RB_diff=SP_RB_diffs[repr],
            fixed_wait_time=FixedWaitTime(dt=Q_("1.0 s")),
        ),
        DT=standard_SP(),
        SIM=standard_SP())
        for repr in ["I", "K1L", "K1"]
    }

    pv_def_SP_list = []
    pv_def_RB_list = []

    # sim_pvname = f"SIMPV:{sim_pvsuffix}"

    ext_SP_pvid = "extpv_I_SP"
    ext_RB_pvid = "extpv_I_RB"
    int_pvid = "intpv_K1"

    pv_def_SP = deepcopy(pv_def)
    pv_def_SP["handle"] = "SP"
    pv_def_SP["ext"]["pvname"] = dict(
        LIVE=put_pvname,
        DT=f"{USERNAME}:{put_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_SP["ext"]["pvunit"] = dict(LIVE="A", DT="A")
    pv_def_SP["ext"]["pvid_in_elem"] = ext_SP_pvid
    pv_def_SP["int"]["pvsuffix"] = sim_pvsuffix
    pv_def_SP["int"]["pvunit"] = "m^{-2}"
    pv_def_SP["int"]["pvid_in_elem"] = int_pvid
    pv_def_SP["int"]["info_to_check"] = info_to_check

    pv_def_RB = deepcopy(pv_def)
    pv_def_RB["handle"] = "RB"
    pv_def_RB["ext"]["pvname"] = dict(
        LIVE=get_pvname,
        DT=f"{USERNAME}:{get_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_RB["ext"]["pvunit"] = dict(LIVE="A", DT="A")
    pv_def_RB["ext"]["pvid_in_elem"] = ext_RB_pvid
    pv_def_RB["int"]["same_as_SP"] = True
    pv_def_RB["int"]["pvid_in_elem"] = pv_def_SP["int"]["pvid_in_elem"]

    simpv_defs.append(
        dict(
            pvclass="QuadrupoleSimPV",
            pvsuffix=sim_pvsuffix,
            args=[lattice_index]))

    elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

    repr_I = "I"
    repr_K1 = "K1"
    repr_K1L = "K1L"

    elem_def.repr_units[repr_I] = "A"
    elem_def.repr_units[repr_K1] = "m^{-2}"
    elem_def.repr_units[repr_K1L] = "m^{-1}"

    elem_def.pvid_to_repr_map.ext[ext_SP_pvid] = repr_I
    elem_def.pvid_to_repr_map.ext[ext_RB_pvid] = repr_I
    elem_def.pvid_to_repr_map.int[int_pvid] = repr_K1

    for repr in [repr_I, repr_K1, repr_K1L]:
        elem_def.channel_map[f"{repr}_RB"] = ChannelSpec(
            handle="RB",
            reprs=[repr],
            pvs=ExtIntStrList(ext=[ext_RB_pvid], int=[int_pvid]),
            pdev_def=pdev_standard_RB_def,
        )

        elem_def.channel_map[f"{repr}_SP"] = ChannelSpec(
            handle="SP",
            reprs=[repr],
            pvs=ExtIntStrList(ext=[ext_SP_pvid], int=[int_pvid]),
            pdev_def=pdev_SP_def_d[repr],
        )

    for in_reprs, out_reprs in [
        ([repr_I], [repr_K1]),
        ([repr_K1], [repr_I]),
        ([repr_I], [repr_K1L]),
        ([repr_K1L], [repr_I]),
        ([repr_K1], [repr_K1L]),
        ([repr_K1L], [repr_K1]),
        ]:

        func_spec = None
        for spec in conv_func_specs:
            if (spec['in_reprs'] == in_reprs) and (spec['out_reprs'] == out_reprs):
                func_spec = FunctionSpec(**spec['func_spec'])
                break
        assert func_spec is not None

        elem_def.func_specs.append(
            ConversionFuncSpec(
                in_reprs=in_reprs, out_reprs=out_reprs, func_spec=func_spec
            )
        )

    pv_def_RB_list.append(pv_def_RB)
    if pv_def_SP is not None:
        pv_def_SP_list.append(pv_def_SP)

    return pv_def_RB_list, pv_def_SP_list

In [None]:
quad_info_list = [
    dict(
        at_elem_name = "QH1G2C30A",
        pml_elem_name = "C30_QH1",
        get_pvname = 'SR:C30-MG{PS:QH1A}I:Ps1DCCT1-I',
        put_pvname = 'SR:C30-MG{PS:QH1A}I:Sp1-SP',
        sim_pvsuffix = 'C30_QH1_K1',
        conv_func_specs=[
            {"in_reprs": ["I"], "out_reprs": ["K1"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.99792, 19.99727, 29.99301, 39.99211, 49.99207, 59.99186, 69.98832, 79.98739, 89.98651, 99.98642, 109.98308, 114.98354, 119.98236, 124.98021, 129.98137, 134.97997, 139.97777]),
                    np.array([0.0, -0.17021670190231752, -0.3359103513970845, -0.5027023487285381, -0.669967737524001, -0.8372302748609871, -1.0040900996169722, -1.170136702043565, -1.3351687080419503, -1.4984868443781048, -1.6591052650423832, -1.8151508328481472, -1.8900354569503879, -1.9613633659609853, -2.0268947679741904, -2.0849237233623508, -2.136283115476966, -2.1827279306257683]),
                ]}},
            {"in_reprs": ["K1"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([-2.1827279306257683, -2.136283115476966, -2.0849237233623508, -2.0268947679741904, -1.9613633659609853, -1.8900354569503879, -1.8151508328481472, -1.6591052650423832, -1.4984868443781048, -1.3351687080419503, -1.170136702043565, -1.0040900996169722, -0.8372302748609871, -0.669967737524001, -0.5027023487285381, -0.3359103513970845, -0.17021670190231752, -0.0]),
                    np.array([139.97777, 134.97997, 129.98137, 124.98021, 119.98236, 114.98354, 109.98308, 99.98642, 89.98651, 79.98739, 69.98832, 59.99186, 49.99207, 39.99211, 29.99301, 19.99727, 9.99792, -1.003537530852583e-15]),
                ]}},
            {"in_reprs": ["I"], "out_reprs": ["K1L"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.99792, 19.99727, 29.99301, 39.99211, 49.99207, 59.99186, 69.98832, 79.98739, 89.98651, 99.98642, 109.98308, 114.98354, 119.98236, 124.98021, 129.98137, 134.97997, 139.97777]),
                    np.array([0.0, -0.0456180761098211, -0.09002397417441864, -0.13472422945924822, -0.17955135365643227, -0.22437771366274456, -0.26909614669734855, -0.3135966361476754, -0.3578252137552427, -0.4015944742933321, -0.4446402110313587, -0.48646042320330346, -0.506529502462704, -0.5256453820775441, -0.5432077978170831, -0.5587595578611101, -0.5725238749478269, -0.5849710854077059]),
                ]}},
            {"in_reprs": ["K1L"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([-0.5849710854077059, -0.5725238749478269, -0.5587595578611101, -0.5432077978170831, -0.5256453820775441, -0.506529502462704, -0.48646042320330346, -0.4446402110313587, -0.4015944742933321, -0.3578252137552427, -0.3135966361476754, -0.26909614669734855, -0.22437771366274456, -0.17955135365643227, -0.13472422945924822, -0.09002397417441864, -0.0456180761098211, -0.0]),
                    np.array([139.97777, 134.97997, 129.98137, 124.98021, 119.98236, 114.98354, 109.98308, 99.98642, 89.98651, 79.98739, 69.98832, 59.99186, 49.99207, 39.99211, 29.99301, 19.99727, 9.99792, -1.003537530852583e-15]),
                ]}},
            {"in_reprs": ["K1"], "out_reprs": ["K1L"],
             "func_spec": {"name": "poly1d", "args": [[0.268, 0.0]]}},
            {"in_reprs": ["K1L"], "out_reprs": ["K1"],
             "func_spec": {"name": "poly1d", "args": [[3.731343283582089, 0.0]]}}
        ]
    ),
    dict(at_elem_name = "QH2G2C30A",
         pml_elem_name = "C30_QH2",
         get_pvname = 'SR:C30-MG{PS:QH2A}I:Ps1DCCT1-I',
         put_pvname = 'SR:C30-MG{PS:QH2A}I:Sp1-SP',
         sim_pvsuffix = 'C30_QH2_K1',
         conv_func_specs=[
            {"in_reprs": ["I"], "out_reprs": ["K1"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.99789, 19.99727, 29.99303, 39.99216, 49.99205, 59.99183, 69.98826, 79.98738, 89.98652, 99.98647, 109.98301, 114.98358, 119.98233, 124.98016, 129.98137, 134.97989, 139.9777]),
                    np.array([0.0, 0.16638347037914303, 0.32921320025067985, 0.49281725769841617, 0.6566507934398196, 0.8202534605199788, 0.9830685564374275, 1.1446959921631217, 1.3046984418931835, 1.462105537212522, 1.6154660803465917, 1.7622649948132814, 1.8320389805134167, 1.898193574852707, 1.959600505066804, 2.0149070584783697, 2.0642803848022506, 2.1089894061963905]),
                ]}},
            {"in_reprs": ["K1"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 0.16638347037914303, 0.32921320025067985, 0.49281725769841617, 0.6566507934398196, 0.8202534605199788, 0.9830685564374275, 1.1446959921631217, 1.3046984418931835, 1.462105537212522, 1.6154660803465917, 1.7622649948132814, 1.8320389805134167, 1.898193574852707, 1.959600505066804, 2.0149070584783697, 2.0642803848022506, 2.1089894061963905]),
                    np.array([0.0, 9.99789, 19.99727, 29.99303, 39.99216, 49.99205, 59.99183, 69.98826, 79.98738, 89.98652, 99.98647, 109.98301, 114.98358, 119.98233, 124.98016, 129.98137, 134.97989, 139.9777]),
                ]}},
            {"in_reprs": ["I"], "out_reprs": ["K1L"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.99789, 19.99727, 29.99303, 39.99216, 49.99205, 59.99183, 69.98826, 79.98738, 89.98652, 99.98647, 109.98301, 114.98358, 119.98233, 124.98016, 129.98137, 134.97989, 139.9777]),
                    np.array([0.0, 0.07653639637440579, 0.15143807211531274, 0.22669593854127146, 0.302059364982317, 0.37731659183919025, 0.45221153596121666, 0.526560156395036, 0.6001612832708645, 0.6725685471177602, 0.7431143969594322, 0.8106418976141094, 0.8427379310361718, 0.8731690444322453, 0.9014162323307299, 0.9268572469000501, 0.9495689770090353, 0.9701351268503396]),
                ]}},
            {"in_reprs": ["K1L"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 0.07653639637440579, 0.15143807211531274, 0.22669593854127146, 0.302059364982317, 0.37731659183919025, 0.45221153596121666, 0.526560156395036, 0.6001612832708645, 0.6725685471177602, 0.7431143969594322, 0.8106418976141094, 0.8427379310361718, 0.8731690444322453, 0.9014162323307299, 0.9268572469000501, 0.9495689770090353, 0.9701351268503396]),
                    np.array([0.0, 9.99789, 19.99727, 29.99303, 39.99216, 49.99205, 59.99183, 69.98826, 79.98738, 89.98652, 99.98647, 109.98301, 114.98358, 119.98233, 124.98016, 129.98137, 134.97989, 139.9777]),
                ]}},
            {"in_reprs": ["K1"], "out_reprs": ["K1L"],
             "func_spec": {"name": "poly1d", "args": [[0.46, 0.0]]}},
            {"in_reprs": ["K1L"], "out_reprs": ["K1"],
             "func_spec": {"name": "poly1d", "args": [[2.1739130434782608, 0.0]]}}]
    ),
]

In [None]:
for d in quad_info_list:
    pv_def_RB_list, pv_def_SP_list = process_quad_definition(
        simpv_defs,
        elem_defs,
        d["at_elem_name"],
        d["pml_elem_name"],
        d["get_pvname"],
        d["put_pvname"],
        d["sim_pvsuffix"],
        d["conv_func_specs"])

    pv_defs.extend(pv_def_RB_list + pv_def_SP_list)

# Sextupole definitions

In [None]:
def process_sext_definition(
    simpv_defs: List,
    elem_defs: Dict,
    at_elem_name: str,
    pml_elem_name: str,
    get_pvname: str,
    put_pvname: str,
    sim_pvsuffix: str,
    conv_func_specs: List[Dict]):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    matched_indexes = at.get_uint32_index(lattice, at_elem_name)
    assert len(matched_indexes) == 1
    lattice_index = int(
        matched_indexes[0]
    )  # Avoid numpy.uint32 that prevents saving into JSON/YAML
    elem = lattice[lattice_index]
    elem_type = elem.definition[0]

    info_to_check = dict(
        elem_name=at_elem_name, elem_type=elem_type, lattice_index=lattice_index
    )

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    SP_RB_diffs = {
        repr: SP_RB_Diff_Def(
            RB_channel=f"{repr}_RB",
            abs_tol=abs_tol,
            rel_tol=None,
            timeout=Q_("10 s"),
            settle_time=Q_("2 s"),
            poll_time=Q_("0.5 s"),
        )
        for repr, abs_tol in [
            ("I", Q_("0.01 A")),
            ("K2L", Q_("5e-4 m^{-2}")),
            ("K2", Q_("1e-4 m^{-3}")),
        ]
    }
    pdev_SP_def_d = {
        repr: MachineModeSpecContainer(
        LIVE=standard_SP(
            set_wait_method="fixed_wait_time",
            SP_RB_diff=SP_RB_diffs[repr],
            fixed_wait_time=FixedWaitTime(dt=Q_("1.0 s")),
        ),
        DT=standard_SP(),
        SIM=standard_SP())
        for repr in ["I", "K2L", "K2"]
    }

    pv_def_SP_list = []
    pv_def_RB_list = []

    # sim_pvname = f"SIMPV:{sim_pvsuffix}"

    ext_SP_pvid = "extpv_I_SP"
    ext_RB_pvid = "extpv_I_RB"
    int_pvid = "intpv_K2"

    pv_def_SP = deepcopy(pv_def)
    pv_def_SP["handle"] = "SP"
    pv_def_SP["ext"]["pvname"] = dict(
        LIVE=put_pvname,
        DT=f"{USERNAME}:{put_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_SP["ext"]["pvunit"] = dict(LIVE="A", DT="A")
    pv_def_SP["ext"]["pvid_in_elem"] = ext_SP_pvid
    pv_def_SP["int"]["pvsuffix"] = sim_pvsuffix
    pv_def_SP["int"]["pvunit"] = "m^{-3}"
    pv_def_SP["int"]["pvid_in_elem"] = int_pvid
    pv_def_SP["int"]["info_to_check"] = info_to_check

    pv_def_RB = deepcopy(pv_def)
    pv_def_RB["handle"] = "RB"
    pv_def_RB["ext"]["pvname"] = dict(
        LIVE=get_pvname,
        DT=f"{USERNAME}:{get_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_RB["ext"]["pvunit"] = dict(LIVE="A", DT="A")
    pv_def_RB["ext"]["pvid_in_elem"] = ext_RB_pvid
    pv_def_RB["int"]["same_as_SP"] = True
    pv_def_RB["int"]["pvid_in_elem"] = pv_def_SP["int"]["pvid_in_elem"]

    simpv_defs.append(
        dict(
            pvclass="SextupoleSimPV",
            pvsuffix=sim_pvsuffix,
            args=[lattice_index]))

    elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

    repr_I = "I"
    repr_K2 = "K2"
    repr_K2L = "K2L"

    elem_def.repr_units[repr_I] = "A"
    elem_def.repr_units[repr_K2] = "m^{-3}"
    elem_def.repr_units[repr_K2L] = "m^{-2}"

    elem_def.pvid_to_repr_map.ext[ext_SP_pvid] = repr_I
    elem_def.pvid_to_repr_map.ext[ext_RB_pvid] = repr_I
    elem_def.pvid_to_repr_map.int[int_pvid] = repr_K2

    for repr in [repr_I, repr_K2, repr_K2L]:
        elem_def.channel_map[f"{repr}_RB"] = ChannelSpec(
            handle="RB",
            reprs=[repr],
            pvs=ExtIntStrList(ext=[ext_RB_pvid], int=[int_pvid]),
            pdev_def=pdev_standard_RB_def,
        )

        elem_def.channel_map[f"{repr}_SP"] = ChannelSpec(
            handle="SP",
            reprs=[repr],
            pvs=ExtIntStrList(ext=[ext_SP_pvid], int=[int_pvid]),
            pdev_def=pdev_SP_def_d[repr],
        )

    for in_reprs, out_reprs in [
        ([repr_I], [repr_K2]),
        ([repr_K2], [repr_I]),
        ([repr_I], [repr_K2L]),
        ([repr_K2L], [repr_I]),
        ([repr_K2], [repr_K2L]),
        ([repr_K2L], [repr_K2]),
        ]:

        func_spec = None
        for spec in conv_func_specs:
            if (spec['in_reprs'] == in_reprs) and (spec['out_reprs'] == out_reprs):
                func_spec = FunctionSpec(**spec['func_spec'])
                break
        assert func_spec is not None

        elem_def.func_specs.append(
            ConversionFuncSpec(
                in_reprs=in_reprs, out_reprs=out_reprs, func_spec=func_spec
            )
        )

    pv_def_RB_list.append(pv_def_RB)
    if pv_def_SP is not None:
        pv_def_SP_list.append(pv_def_SP)

    return pv_def_RB_list, pv_def_SP_list

In [None]:
sext_info_list = [
    dict(
        at_elem_name = "SH1G2C30A",
        pml_elem_name = "C30_SH1",
        get_pvname = 'SR:C30-MG{PS:SH1-P2}I:Ps1DCCT1-I',
        put_pvname = 'SR:C30-MG{PS:SH1-P2}I:Sp1-SP',
        sim_pvsuffix = 'C30_SH1_K2',
        conv_func_specs=[
            {"in_reprs": ["I"], "out_reprs": ["K2"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.998, 19.99731, 29.99719, 39.99452, 49.99449, 54.99593, 59.99394, 64.9954, 69.99395, 74.99271, 79.99358, 84.99239, 89.99333, 94.99182, 99.99295, 104.99144, 109.99045]),
                    np.array([0.0, 2.1614170368398837, 4.200404477650452, 6.26175199044168, 8.335026733036429, 10.413578770346678, 11.452735807435879, 12.490280843041559, 13.529632973245715, 14.568437706875974, 15.601937440853234, 16.636188988540578, 17.669717166091843, 18.70108779624318, 19.731011401021547, 20.75804814912229, 21.78539358649316, 22.811419935849624]),
                ]}},
            {"in_reprs": ["K2"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 2.1614170368398837, 4.200404477650452, 6.26175199044168, 8.335026733036429, 10.413578770346678, 11.452735807435879, 12.490280843041559, 13.529632973245715, 14.568437706875974, 15.601937440853234, 16.636188988540578, 17.669717166091843, 18.70108779624318, 19.731011401021547, 20.75804814912229, 21.78539358649316, 22.811419935849624]),
                    np.array([0.0, 9.998, 19.99731, 29.99719, 39.99452, 49.99449, 54.99593, 59.99394, 64.9954, 69.99395, 74.99271, 79.99358, 84.99239, 89.99333, 94.99182, 99.99295, 104.99144, 109.99045000000001]),
                ]}},
            {"in_reprs": ["I"], "out_reprs": ["K2L"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.998, 19.99731, 29.99719, 39.99452, 49.99449, 54.99593, 59.99394, 64.9954, 69.99395, 74.99271, 79.99358, 84.99239, 89.99333, 94.99182, 99.99295, 104.99144, 109.99045]),
                    np.array([0.0, 0.43228340736797677, 0.8400808955300905, 1.2523503980883361, 1.6670053466072858, 2.0827157540693357, 2.2905471614871757, 2.498056168608312, 2.705926594649143, 2.913687541375195, 3.120387488170647, 3.327237797708116, 3.533943433218369, 3.7402175592486357, 3.9462022802043095, 4.151609629824458, 4.3570787172986325, 4.562283987169925]),
                ]}},
            {"in_reprs": ["K2L"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 0.43228340736797677, 0.8400808955300905, 1.2523503980883361, 1.6670053466072858, 2.0827157540693357, 2.2905471614871757, 2.498056168608312, 2.705926594649143, 2.913687541375195, 3.120387488170647, 3.327237797708116, 3.533943433218369, 3.7402175592486357, 3.9462022802043095, 4.151609629824458, 4.3570787172986325, 4.562283987169925]),
                    np.array([0.0, 9.998, 19.99731, 29.99719, 39.99452, 49.99449, 54.99593, 59.99394, 64.9954, 69.99395, 74.99271, 79.99358, 84.99239, 89.99333, 94.99182, 99.99295, 104.99144, 109.99045000000001]),
                ]}},
            {"in_reprs": ["K2"], "out_reprs": ["K2L"],
             "func_spec": {"name": "poly1d", "args": [[0.2, 0.0]]}},
            {"in_reprs": ["K2L"], "out_reprs": ["K2"],
             "func_spec": {"name": "poly1d", "args": [[5.0, 0.0]]}}]
    ),
    dict(
        at_elem_name = "SM1G4C30B",
        pml_elem_name = "C30_SM1B",
        get_pvname = 'SR:C02-MG{PS:SM1B-P2}I:Ps1DCCT1-I',
        put_pvname = 'SR:C02-MG{PS:SM1B-P2}I:Sp1-SP',
        sim_pvsuffix = 'C30_SM1B_K2',
        conv_func_specs=[
            {"in_reprs": ["I"], "out_reprs": ["K2"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.9975, 19.99691, 29.9971, 39.99492, 49.9951, 54.99666, 59.99509, 64.99646, 69.9953, 74.99396, 79.99524, 84.99395, 89.99498, 94.99391, 99.99449, 104.99335, 109.99232]),
                    np.array([0.0, -2.160632407607249, -4.201669669287124, -6.263552049081986, -8.337748713120195, -10.41523525511305, -11.456247199207814, -12.494481364684557, -13.529840651471732, -14.570094969453939, -15.60296097681778, -16.64089362080598, -17.676128162800968, -18.70694582011935, -19.736285703900815, -20.76437103430962, -21.794110466111498, -22.823022034156406]),
                ]}},
            {"in_reprs": ["K2"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([-22.823022034156406, -21.794110466111498, -20.76437103430962, -19.736285703900815, -18.70694582011935, -17.676128162800968, -16.64089362080598, -15.60296097681778, -14.570094969453939, -13.529840651471732, -12.494481364684557, -11.456247199207814, -10.41523525511305, -8.337748713120195, -6.263552049081986, -4.201669669287124, -2.160632407607249, -0.0]),
                    np.array([109.99232, 104.99335, 99.99449, 94.99391, 89.99498, 84.99395, 79.99524, 74.99396, 69.9953, 64.99646, 59.99509, 54.99666, 49.9951, 39.99492, 29.9971, 19.99691, 9.9975, -5.724587470723463e-17]),
                ]}},
            {"in_reprs": ["I"], "out_reprs": ["K2L"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([0.0, 9.9975, 19.99691, 29.9971, 39.99492, 49.9951, 54.99666, 59.99509, 64.99646, 69.9953, 74.99396, 79.99524, 84.99395, 89.99498, 94.99391, 99.99449, 104.99335, 109.99232]),
                    np.array([0.0, -0.43212648152144983, -0.8403339338574248, -1.2527104098163973, -1.6675497426240393, -2.0830470510226102, -2.291249439841563, -2.4988962729369115, -2.7059681302943464, -2.914018993890788, -3.1205921953635563, -3.3281787241611958, -3.535225632560194, -3.741389164023871, -3.9472571407801635, -4.152874206861924, -4.3588220932223, -4.564604406831282]),
                ]}},
            {"in_reprs": ["K2L"], "out_reprs": ["I"],
             "func_spec": {
                "name": "pchip_interp",
                "args": [
                    np.array([-4.564604406831282, -4.3588220932223, -4.152874206861924, -3.9472571407801635, -3.741389164023871, -3.535225632560194, -3.3281787241611958, -3.1205921953635563, -2.914018993890788, -2.7059681302943464, -2.4988962729369115, -2.291249439841563, -2.0830470510226102, -1.6675497426240393, -1.2527104098163973, -0.8403339338574248, -0.43212648152144983, -0.0]),
                    np.array([109.99232, 104.99335, 99.99449, 94.99391, 89.99498, 84.99395, 79.99524, 74.99396, 69.9953, 64.99646, 59.99509, 54.99666, 49.9951, 39.99492, 29.9971, 19.99691, 9.9975, -5.724587470723463e-17]),
                ]}},
            {"in_reprs": ["K2"], "out_reprs": ["K2L"],
             "func_spec": {"name": "poly1d", "args": [[0.2, 0.0]]}},
            {"in_reprs": ["K2L"], "out_reprs": ["K2"],
             "func_spec": {"name": "poly1d", "args": [[5.0, 0.0]]}}]
    ),
]

In [None]:
for d in sext_info_list:
    pv_def_RB_list, pv_def_SP_list = process_sext_definition(
        simpv_defs,
        elem_defs,
        d["at_elem_name"],
        d["pml_elem_name"],
        d["get_pvname"],
        d["put_pvname"],
        d["sim_pvsuffix"],
        d["conv_func_specs"])

    pv_defs.extend(pv_def_RB_list + pv_def_SP_list)

# RF frequency element definition

In [None]:
def process_rf_freq_definition(
    simpv_defs: List,
    elem_defs: Dict,
    pml_elem_name: str,
    get_pvname: str,
    put_pvname: str,
    sim_pvsuffix: str):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    info_to_check = {}

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    pv_def_SP_list = []
    pv_def_RB_list = []

    # sim_pvname = f"SIMPV:{sim_pvsuffix}"

    ext_SP_pvid = "extpv_rf_freq_SP"
    ext_RB_pvid = "extpv_rf_freq_RB"
    int_pvid = "intpv_rf_freq"

    pv_def_SP = deepcopy(pv_def)
    pv_def_SP["handle"] = "SP"
    pv_def_SP["ext"]["pvname"] = dict(
        LIVE=put_pvname,
        DT=f"{USERNAME}:{put_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_SP["ext"]["pvunit"] = dict(LIVE="GHz", DT="GHz")
    pv_def_SP["ext"]["pvid_in_elem"] = ext_SP_pvid
    pv_def_SP["int"]["pvsuffix"] = sim_pvsuffix
    pv_def_SP["int"]["pvunit"] = "Hz"
    pv_def_SP["int"]["pvid_in_elem"] = int_pvid
    pv_def_SP["int"]["info_to_check"] = info_to_check

    pv_def_RB = deepcopy(pv_def)
    pv_def_RB["handle"] = "RB"
    pv_def_RB["ext"]["pvname"] = dict(
        LIVE=get_pvname,
        DT=f"{USERNAME}:{get_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_RB["ext"]["pvunit"] = dict(LIVE="Hz", DT="Hz")
    pv_def_RB["ext"]["pvid_in_elem"] = ext_RB_pvid
    pv_def_RB["int"]["same_as_SP"] = True
    pv_def_RB["int"]["pvid_in_elem"] = pv_def_SP["int"]["pvid_in_elem"]

    simpv_defs.append(dict(pvclass="RfFreqSimPV", pvsuffix=sim_pvsuffix))

    elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

    repr = "freq"

    elem_def.repr_units[repr] = "Hz"

    elem_def.pvid_to_repr_map.ext[ext_SP_pvid] = repr
    elem_def.pvid_to_repr_map.ext[ext_RB_pvid] = repr
    elem_def.pvid_to_repr_map.int[int_pvid] = repr

    elem_def.channel_map[f"{repr}_RB"] = ChannelSpec(
        handle="RB",
        reprs=[repr],
        pvs=ExtIntStrList(ext=[ext_RB_pvid], int=[int_pvid]),
        pdev_def=pdev_standard_RB_def,
    )

    elem_def.channel_map[f"{repr}_SP"] = ChannelSpec(
        handle="SP",
        reprs=[repr],
        pvs=ExtIntStrList(ext=[ext_SP_pvid], int=[int_pvid]),
        pdev_def=MachineModeSpecContainer(
            LIVE=standard_SP(
                set_wait_method="SP_RB_diff",
                fixed_wait_time=FixedWaitTime(dt=Q_("5 s")),
                SP_RB_diff=SP_RB_Diff_Def(
                    RB_channel=f"{repr}_RB",
                    abs_tol=Q_("0.1 Hz"),
                    rel_tol=None,
                    timeout=Q_("10 s"),
                    settle_time=Q_("2 s"),
                    poll_time=Q_("0.5 s"),
                ),
            ),
            DT=standard_SP(),
            SIM=standard_SP(),
        ),
    )

    pv_def_RB_list.append(pv_def_RB)
    if pv_def_SP is not None:
        pv_def_SP_list.append(pv_def_SP)

    return pv_def_RB_list, pv_def_SP_list

In [None]:
rf_freq_info = dict(
    pml_elem_name = "RF_Freq",
    get_pvname = "RF{FCnt:1}Freq:I",
    put_pvname = "RF{Osc:1}Freq:SP",
    sim_pvsuffix = "rf_freq",
)

In [None]:
d = rf_freq_info
pv_def_RB_list, pv_def_SP_list = process_rf_freq_definition(
    simpv_defs,
    elem_defs,
    d["pml_elem_name"],
    d["get_pvname"],
    d["put_pvname"],
    d["sim_pvsuffix"])

pv_defs.extend(pv_def_RB_list + pv_def_SP_list)

# DCCT (i.e., beam current) element definition

In [None]:
def process_dcct_definition(
    simpv_defs: List,
    elem_defs: Dict,
    pml_elem_name: str,
    get_pvname: str,
    sim_pvsuffix: str):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    info_to_check = {}

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    pv_def_RB_list = []

    ext_pvid = "extpv_beam_current_RB"
    int_pvid = "intpv_beam_current"

    pv_def_RB = deepcopy(pv_def)

    pv_def_RB["handle"] = "RB"
    pv_def_RB["ext"]["pvname"] = dict(
        LIVE=get_pvname,
        DT=f"{USERNAME}:{get_pvname}" # PV name for DT (Digital Twin)
    )
    pv_def_RB["ext"]["pvunit"] = dict(LIVE="mA", DT="mA")
    pv_def_RB["ext"]["pvid_in_elem"] = ext_pvid

    pv_def_RB["int"]["pvsuffix"] = sim_pvsuffix
    pv_def_RB["int"]["pvunit"] = "A"
    pv_def_RB["int"]["pvid_in_elem"] = int_pvid
    pv_def_RB["int"]["info_to_check"] = info_to_check

    simpv_defs.append(dict(pvclass="BeamCurrentSimPV", pvsuffix=sim_pvsuffix))

    elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

    repr = "I"

    elem_def.repr_units[repr] = "mA"

    elem_def.pvid_to_repr_map.ext[ext_pvid] = repr
    elem_def.pvid_to_repr_map.int[int_pvid] = repr

    elem_def.channel_map[f"{repr}_RB"] = ChannelSpec(
        handle="RB",
        reprs=[repr],
        pvs=ExtIntStrList(ext=[ext_pvid], int=[int_pvid]),
        pdev_def=pdev_standard_RB_def,
    )

    pv_def_RB_list.append(pv_def_RB)

    return pv_def_RB_list

In [None]:
dcct_info = dict(
    pml_elem_name = "Beam_Current",
    get_pvname = "SR:C03-BI{DCCT:1}I:Real-I",
    sim_pvsuffix = "beam_current",
)

In [None]:
d = dcct_info
pv_def_RB_list = process_dcct_definition(
    simpv_defs,
    elem_defs,
    d["pml_elem_name"],
    d["get_pvname"],
    d["sim_pvsuffix"])

pv_defs.extend(pv_def_RB_list)

# Tune element definition

In [None]:
def process_tune_diag_definition(
    simpv_defs: List,
    elem_defs: Dict,
    pml_elem_name: str,
    get_pvname_d: Dict,
    sim_pvsuffix_d: Dict):

    pv_def = dict(elem_name=pml_elem_name, handle=None, ext={}, int={})

    info_to_check = {}

    pdev_standard_RB_def = MachineModeSpecContainer(
        LIVE=standard_RB(), DT=standard_RB(), SIM=standard_RB()
    )

    pv_def_RB_list = []

    for plane, get_pvname in get_pvname_d.items():

        ext_pvid = f"extpv_bxb_tune_{plane}_RB"
        int_pvid = f"intpv_bxb_tune_{plane}"

        pv_def_RB = deepcopy(pv_def)
        pv_def_RB["handle"] = "RB"
        pv_def_RB["ext"]["pvname"] = dict(
            LIVE=get_pvname,
            DT=f"{USERNAME}:{get_pvname}" # PV name for DT (Digital Twin)
        )
        pv_def_RB["ext"]["pvunit"] = dict(LIVE="", DT="")
        pv_def_RB["ext"]["pvid_in_elem"] = ext_pvid

        sim_pvsuffix = sim_pvsuffix_d[plane]

        pv_def_RB["int"]["pvsuffix"] = sim_pvsuffix
        pv_def_RB["int"]["pvunit"] = ""
        pv_def_RB["int"]["pvid_in_elem"] = int_pvid
        pv_def_RB["int"]["info_to_check"] = info_to_check

        simpv_defs.append(dict(pvclass="TuneSimPV", pvsuffix=sim_pvsuffix,
                               args=[plane]))

        elem_def = _get_blank_elem_def(elem_defs, pv_def["elem_name"])

        repr = f"nu{plane}"

        elem_def.repr_units[repr] = ""

        elem_def.pvid_to_repr_map.ext[ext_pvid] = repr
        elem_def.pvid_to_repr_map.int[int_pvid] = repr

        elem_def.channel_map[f"{repr}_RB"] = ChannelSpec(
            handle="RB",
            reprs=[repr],
            pvs=ExtIntStrList(ext=[ext_pvid], int=[int_pvid]),
            pdev_def=pdev_standard_RB_def,
        )

        pv_def_RB_list.append(pv_def_RB)

    return pv_def_RB_list

In [None]:
tune_diag_info = dict(
    pml_elem_name = "BxB_Tune",
    get_pvname_d = {"x": "SR:OPS-BI{IGPF}FBX:Tune-I",
                    "y": "SR:OPS-BI{IGPF}FBY:Tune-I"},
    sim_pvsuffix_d = {"x": "tune_x", "y": "tune_y"},
)

In [None]:
d = tune_diag_info
pv_def_RB_list = process_tune_diag_definition(
    simpv_defs,
    elem_defs,
    d["pml_elem_name"],
    d["get_pvname_d"],
    d["sim_pvsuffix_d"])

pv_defs.extend(pv_def_RB_list)

# Save these configuration data to files

Note that both JSON and YAML files are being saved here. YAML is typically more human readable, but in some cases, JSON is easier to read. So, both formats are being used at this stage. In the future, most likely only YAML will be used after more formatting options are explored such that all YAML files become easy to read.

# Save PV definitions

In [None]:
pv_defs_for_file = {
    "facility": sim_configs['facility'],
    "machine": sim_configs['machine'],
    "simulator_config": sim_configs["selected_config"],
    "pv_definitions": pv_defs,
}

with open(sel_config_folder / "pvs.yaml", "w") as f:
    yaml.dump(
        pv_defs_for_file,
        f,
        sort_keys=False,
        default_flow_style=False,
        width=70,
        indent=2,
        Dumper=CustomDumper,
    )
with open(sel_config_folder / "pvs.json", "w") as f:
    json.dump(pv_defs_for_file, f, indent=2)

# Save SimPV definitions

In [None]:
sim_pv_defs_for_file = {
    "facility": sim_configs['facility'],
    "machine": sim_configs['machine'],
    "simulator_config": sim_configs["selected_config"],
    "sim_pv_definitions": simpv_defs,
}

with open(sel_config_folder / "sim_pvs.yaml", "w") as f:
    yaml.dump(
        sim_pv_defs_for_file,
        f,
        sort_keys=False,
        default_flow_style=False,
        width=70,
        indent=2,
        Dumper=CustomDumper,
    )
with open(sel_config_folder / "sim_pvs.json", "w") as f:
    json.dump(sim_pv_defs_for_file, f, indent=2)

# Save element (MLV := Middle Layer Variable) definitions

In [None]:
elem_defs_for_file = {
    "facility": sim_configs['facility'],
    "machine": sim_configs['machine'],
    "simulator_config": sim_configs["selected_config"],
    "elem_definitions": elem_defs,
}

json_safe_elem_defs = {}
for k, v in elem_defs_for_file.items():
    if k != "elem_definitions":
        json_safe_elem_defs[k] = v
    else:
        e_defs = json_safe_elem_defs["elem_definitions"] = {}
        for elem_name, pamila_elem_def in v.items():
            e_defs[elem_name] = json.loads(pamila_elem_def.model_dump_json())

with open(sel_config_folder / "elements.yaml", "w") as f:
    yaml.dump(
        json_safe_elem_defs,
        f,
        sort_keys=False,
        default_flow_style=False,
        width=70,
        indent=2,
        Dumper=CustomDumper,
    )
with open(sel_config_folder / "elements.json", "w") as f:
    json.dump(json_safe_elem_defs, f, indent=2)

# Compute design lattice properties

In [None]:
design_props = {}

rad_params = at.physics.ring_parameters.radiation_parameters(lattice)

for prop_name in dir(rad_params):
    if prop_name.startswith('_'):
        continue
    val = getattr(rad_params, prop_name)
    if isinstance(val, np.ndarray):
        design_props[prop_name] = val.tolist()
    else:
        design_props[prop_name] = val

# Force numpy scalar objects to python objects so that they can be
# saved to JSON/YAML files.
design_props = json.loads(json.dumps(design_props))
design_props

# Save design lattice property definitions to files

In [None]:
design_props_for_file = {
    "facility": sim_configs['facility'],
    "machine": sim_configs['machine'],
    "simulator_config": sim_configs["selected_config"],
    "design_properties": design_props,
}

with open(sel_config_folder / "design_props.yaml", "w") as f:
    yaml.dump(
        design_props_for_file,
        f,
        sort_keys=False,
        default_flow_style=False,
        width=70,
        indent=2,
        Dumper=CustomDumper,
    )
with open(sel_config_folder / "design_props.json", "w") as f:
    json.dump(design_props_for_file, f, indent=2)

# Check if the created config files are loadable

In [None]:
print(f"{facility_folder = }")
print(f"{machine_name = }")
machine_obj = pml.load_machine(machine_name, dirpath=facility_folder)

# MLVLs (MLV lists) and MLVTs (MLV trees) will be defined in a separate notebook.