# Gas Dosing Demo

I created a makefile to make interacting with the Demo easier: 

Start Docker containers with the SEC Nodes running on port 10800 (gas_dosing) and 10801 (reactorcell):

 ``` make sim```

Start Containers and additionally start frappy gui:

```make frappy``` 

In [1]:
import numpy as np
from bluesky import RunEngine
import bluesky.plan_stubs as bps
from bluesky.plan_stubs import sleep, rd
from bluesky.plans import scan, count


import databroker

from bluesky.preprocessors import run_decorator, SupplementalData
from bluesky import preprocessors as bpp
from bluesky.callbacks.best_effort import BestEffortCallback
from pprint import pprint
from secop_ophyd.SECoPDevices import SECoPNodeDevice, SECoPReadableDevice
from secop_ophyd.SECoPSignal import SECoPParamBackend
from bluesky.utils import ProgressBarManager

from tiled.client import from_uri
from nexuscreator import NexusCreatorClass

import time
from ophyd.status import Status


from bluesky.log import config_bluesky_logging

run = True

config_bluesky_logging(level='WARNING')
# Create a run engine and a temporary file backed database. Send all the documents from the RE into that database
RE = RunEngine({},call_returns_result=True)
bec = BestEffortCallback()
RE.subscribe(bec)
RE.waiting_hook = ProgressBarManager()
RE.ignore_callback_exceptions = False



#Example of adding metadata to RE environment
investigation_id = "Nexus Demonstrator"

RE.md["investigation_id"] = investigation_id


client = from_uri("http://localhost:8000",api_key="secret")

def post_document(name,doc):
    client.post_document(name, doc)
    
RE.subscribe(post_document)

# Connect to Gas Dosing SEC Node and generate ophyd device tree
gas_dosing =  SECoPNodeDevice.create('localhost','10800',RE.loop)

# Connect to Reactor Cell SEC Node and generate ophyd device tree
reactor_cell =  SECoPNodeDevice.create('localhost','10801',RE.loop)

gas_dosing.class_from_instance()
reactor_cell.class_from_instance()


from genNodeClass import *

gas_dosing:Gas_dosing = gas_dosing
reactor_cell:Reactor_cell = reactor_cell

baseline = [gas_dosing.massflow_contr1.ramp]

sd = SupplementalData(baseline=baseline)

RE.preprocessors.append(sd)

gas_dosing ready
reactor_cell ready


In [2]:
from typing_extensions import Self
from ophyd_async.core import Signal


class SECOP2NEXUS():
    @classmethod
    async def create(cls,nodes:list[SECoPNodeDevice]) -> Self:
        converter = SECOP2NEXUS(nodes)

        converter.group_names = [(await node.equipment_id.get_value()).split('.')[0] for node in nodes]

        for node in nodes:
            for name,module in node.mod_devices.items():
                if hasattr(module,'meaning'):
                    meaning_arr = await module.meaning.get_value()
                    meaning:dict  = meaning_arr.item() 

                    new_element:tuple = (meaning['importance'],module,node) 

                    if converter.meaning_dict.__contains__(meaning['function']):

                        converter.meaning_dict[meaning['function']].append(new_element)

                    else:
                        converter.meaning_dict[meaning['function']] = [new_element] 

        ## main part of nx_struct
        await converter.gen_general_data_struct()     

        ## sample partof nx_struct
        await converter.gen_sample_data_struct()

        return converter

        



    def __init__(self,nodes:list[SECoPNodeDevice]) -> None:
        self.nodes:list[SECoPNodeDevice] = nodes
        self.meaning_dict:dict[str,list[tuple[float,SECoPReadableDevice,SECoPNodeDevice]]] = {}
        self.group_names:list[str] = []

        self.header:str = """
entry:NXentry
\t@NX_class = "NXentry"
"""

        self.general_data_struct:str= ""
        self.sample_data_struct:str = ""

    async def _generate_module_nexus_struct(self,module:SECoPReadableDevice) -> str:

        implementation: str = str(await module.implementation.get_value())
        description: str = str(await module.description.get_value())

        measurement_line = ""
        importance_line = ""
        key_line = ""
        link_line = ""

        # TODO handle legacy meaning correctly
        if hasattr(module,'meaning'):
            meaning_arr = await module.meaning.get_value()

            meaning: dict = meaning_arr.item()

            if meaning.get("function"):
                function: str = meaning.get("function", "")
                measurement_line = f'\n\tmeasurement:NX_CHAR = "{function}"'

                importance_line = (
                    f"\n\t\t@secop_meaning_importance = {meaning.get('importance')}"
                    if meaning.get("importance")
                    else ""
                )
                key_line = (
                    f"\n\t\t@long_name = \"{meaning.get('key')}\""
                    if meaning.get("key")
                    else ""
                )
                link_line = (
                    f"\n\t\t@PID = \"{meaning.get('link')}\""
                    if meaning.get("link")
                    else ""
                )

                # remove new line chars
        description = "".join(description.splitlines())

        text = f"""
{module._module}:NXsensor
\t@NX_class = NXsensor
\tname:NX_CHAR = "{module._module}"{measurement_line}{importance_line}{key_line}{link_line} 
\tmodel:NX_CHAR = "{implementation}"
\tdescription:NX_CHAR = "{description}"

"""
        value_log: str = await self._generate_nexus_parameter_log(module,"value","value_log")

        text += "\t".join(value_log.splitlines(True))

        if hasattr(module,"target"):
            target_log: str = await self._generate_nexus_parameter_log(module,"target","target_log")

            text += "\t".join(target_log.splitlines(True))


        text += "\tsecop_parameters:NXcollection\n"
        text += "\t\t@NX_class = NXcollection"
        for parameter in module.param_devices.keys():
            if parameter in ["value","target"]:
                continue

            param_log: str = await self._generate_nexus_parameter_log(module,parameter)

            text += "\t\t".join(param_log.splitlines(True))


        text += "\tsecop_module_properties:NXcollection\n"
        text += "\t\t@NX_class = NXcollection\n"
        for property, prop_dev in module.mod_prop_devices.items():
            text += f"\t\t{property}:NX_CHAR = \"{str(await prop_dev._backend.get_value())}\"\n"
            print(str(await prop_dev._backend.get_value()))
        return text


    async def _generate_nexus_parameter_log(self,module:SECoPReadableDevice,param_name: str,nxlog_name:str = "") -> str:

        sig: Signal = getattr(module, param_name)

        sig_backend: SECoPParamBackend = sig._backend

        # check if vlaue is numeric (NXlog only supports mumerical values)
        if not sig_backend.is_number():
            return ""

        param_unit = f'"{sig_backend.get_unit()}"'

        unit_line = (
            f"\n\t\t@units = {param_unit}" if sig_backend.get_unit() is not None else ""
        )

        log_name = param_name if nxlog_name == "" else nxlog_name

        if param_name in ["value","target"]:
            nxlog_path = log_name
        else:
            nxlog_path = f"secop_parameters/{param_name}"

        time_path = f"/entry/sample_environment/{module._secclient.nodename}/{module._module}/{nxlog_path}/time"
        data_path = f"/entry/sample_environment/{module._secclient.nodename}/{module._module}/{nxlog_path}/value"

        text = f"""
{log_name}:NXlog
\t@NX_class = NXlog
\t@default = default_data
\tvalue:NX_FLOAT64 = {sig.name}{unit_line}
\t\t@type = "{sig_backend.describe_dict['SECoP_dtype']}"
\ttime:NX_FLOAT64 = {sig.name}-timestamp
\t\t@start = 0
\t\t@units = "s"
\tdescription:NX_CHAR = "{sig_backend._param_description["description"]}"
\tdefault_data:NXdata
\t\t@NX_class = "NXdata"
\t\t@signal = "{param_name}"
\t\t@axes = "time"
\t\t{param_name}: -->  {data_path}
\t\ttime: -->      {time_path}
\t\ttitle:NX_CHAR = "{sig_backend._param_description["description"]}"

"""

        return text


    async def _generate_node_nexus_struct(self,node:SECoPNodeDevice) -> str:

        equipment_id: str = str(await node.equipment_id.get_value())

        if hasattr(node, "firmware"):
            firmware: str = str(await node.firmware.get_value())
        else:
            firmware = ""

        version: str = str(await node.version.get_value())
        description: str = str(await node.description.get_value())
        # remove newline chars
        description = "".join(description.splitlines())

        describe_message = str(node._secclient.client.descriptive_data).replace("\n", "")


        text = f"""
{equipment_id}:NXenvironment
\t@NX_class = NXenvironment
\tname:NX_CHAR = "{equipment_id}"
\tshort_name:NX_CHAR = "{equipment_id.split('.')[0]}"
\ttype:NX_CHAR = "{firmware} ({version})"
\tdescription:NX_CHAR = "{description}"
\tsecop_describe_message:NX_CHAR = "{describe_message}"

"""
        for module in node.mod_devices.values():
            mod_str: str = await self._generate_module_nexus_struct(module) 

            text += "\t".join(mod_str.splitlines(True))

        return text


    async def gen_general_data_struct(self):
         ### Gerneral Data             
        self.general_data_struct = """
\tsample_environment:NXcollection
\t\t@NX_class = "NXcollection"
"""

        for node in self.nodes:
            node_text = await self._generate_node_nexus_struct(node=node)

            self.general_data_struct +=   "\t\t".join(node_text.splitlines(True))


    async def gen_sample_data_struct(self):
         ### Sample Data             
        self.sample_data_struct = """
\tsample:NXsample
\t\t@NX_class = "NXsample"
\t\ttype:NX_CHAR = "sample"
"""



        for meaning , mod_list in self.meaning_dict.items():
            sorted_by_importance = sorted(mod_list, key=lambda tup: tup[0])
            
            closest_to_sample = sorted_by_importance[0]

            closest_module:SECoPReadableDevice = closest_to_sample[1]

            env_node:SECoPNodeDevice = closest_to_sample[2]

            node_name = (await env_node.equipment_id.get_value()).split('.')[0]
            self.sample_data_struct += f"""
\t\t{meaning}_env: --> /entry/sample_environment/{node_name}
\t\t{meaning}: --> /entry/sample_environment/{node_name}/{closest_module._module}/value_log
"""


            


    def write_nx_struct(self,path:str):
        with open(path, "w") as file:
            file.write(self.get_nexus_struct())

    def get_nexus_struct(self):

        return self.header + self.sample_data_struct + self.general_data_struct

conv = await SECOP2NEXUS.create(nodes= [gas_dosing,reactor_cell])


conv.write_nx_struct("demonstrator.nxstruct")



massflow
A simulated massflow controller 
frappy_HZB.massflow_controller.MassflowController
['Drivable']
[]
massflow
A simulated massflow controller 
frappy_HZB.massflow_controller.MassflowController
['Drivable']
[]
massflow
A simulated massflow controller 
frappy_HZB.massflow_controller.MassflowController
['Drivable']
[]
pressure
A simulated pressure controller 
{'key': 'PLACEHOLDER', 'link': 'https://w3id.org/nfdi4cat/PLACEHOLDER', 'function': 'pressure', 'importance': 40, 'belongs_to': 'sample'}
frappy_HZB.pressure_controller.PressureController
['Drivable']
[]
temp control
A simulated temperature controller 
{'key': 'PLACEHOLDER', 'link': 'https://w3id.org/nfdi4cat/PLACEHOLDER', 'function': 'temperature_regulation', 'importance': 40, 'belongs_to': 'sample'}
frappy_HZB.temp_reg.TemperatureController
['Drivable']
[]
temp_sens
A simulated Temperature Sensor 
{'key': 'PLACEHOLDER', 'link': 'https://w3id.org/nfdi4cat/PLACEHOLDER', 'function': 'temperature', 'importance': 40, 'belongs_to'

In [4]:

run = client[-1]

variables = {}

for key in run.primary.data.keys():
    if key == 'time':
        continue
    variables[key] = np.asarray(run.primary.data.get(key)).tolist()
    variables[key+'-timestamp'] = np.asarray(run.primary.data.get('time')).tolist()

config_dict:dict = run.primary.metadata['descriptors'][0]['configuration']

#pprint(config_dict)

for key,value in config_dict.items():
    data_dict:dict = value['data']
    postfix = '-meaning'

    data_dict = {key: value for key, value in data_dict.items() if not key.endswith(postfix)}

    variables.update(data_dict)

    timestamp_dict:dict = value['timestamps']
    variables.update ({k+'-timestamp': v for k, v in timestamp_dict.items()})


#pprint(variables)

# Preparing NexusCreator
flags = {
  'nxstruct': './demonstrator.nxstruct',
  'input': variables,
  'dictionary': False,
  'filePerScan':1,
  'debug':False
}


nexuscreator = NexusCreatorClass.NexusCreator()
nexuscreator.executeConversion(flags)



Conversion: Object to Nexus
dictionary ---> ./demonstrator.nxs

Reading NXstruct file:	./demonstrator.nxstruct
Opening Nexus file:	./demonstrator.nxs
Creating entry...
Closing Nexus file
