Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hoist Data Management code from XPCS and HEDM #938

Merged
merged 9 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ Fixes

* lineup2() should work with low intensity peaks.
* lineup2() would raise ZeroDivideError in some cases.
* Increase minimum aps-dm-api version to 8.

Maintenance
-----------

* Code format conforms to 'ruff'.
* Add additional support for APS Data Management API.

1.6.18
******
Expand Down
41 changes: 27 additions & 14 deletions apstools/devices/aps_data_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,30 @@

~DM_WorkflowConnector

from: https://github.com/APS-1ID-MPE/hexm-bluesky/blob/main/instrument/devices/data_management.py
"""

__all__ = """
DM_WorkflowConnector
""".split()

import logging
import os
import time

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # allow any log content at this level
logger.info(__file__)

from ophyd import Component
from ophyd import Device
from ophyd import Signal

from ..utils import run_in_thread

DM_STATION_NAME = str(os.environ.get("DM_STATION_NAME", "terrier")).lower()
logger = logging.getLogger(__name__)

NOT_AVAILABLE = "-n/a-"
NOT_RUN_YET = "not_run"
POLLING_PERIOD_S = 1.0
REPORT_PERIOD_DEFAULT = 10
REPORT_PERIOD_MIN = 1
STARTING = "running"
TIMEOUT_DEFAULT = 180 # TODO: Consider removing the timeout feature
TIMEOUT_DEFAULT = 180 # TODO: Consider removing/renaming the timeout feature


class DM_WorkflowConnector(Device):
Expand Down Expand Up @@ -88,9 +84,9 @@ class DM_WorkflowConnector(Device):
"""

job = None # DM processing job (must update during workflow execution)
_api = None # DM common API
_api = None # DM processing API

owner = Component(Signal, value=DM_STATION_NAME, kind="config")
owner = Component(Signal, value="", kind="config")
workflow = Component(Signal, value="")
workflow_args = {}

Expand All @@ -101,7 +97,7 @@ class DM_WorkflowConnector(Device):
stage_id = Component(Signal, value=NOT_RUN_YET)
status = Component(Signal, value=NOT_RUN_YET)

polling_period = Component(Signal, value=0.1, kind="config")
polling_period = Component(Signal, value=POLLING_PERIOD_S, kind="config")
reporting_period = Component(Signal, value=REPORT_PERIOD_DEFAULT, kind="config")
concise_reporting = Component(Signal, value=True, kind="config")

Expand All @@ -127,9 +123,11 @@ def __init__(self, name=None, workflow=None, **kwargs):
if name is None:
raise KeyError("Must provide value for 'name'.")
super().__init__(name=name)

if workflow is not None:
self.workflow.put(workflow)
self.workflow_args.update(kwargs)
self.owner.put(self.api.username)

def put_if_different(self, signal, value):
"""Put ophyd signal only if new value is different."""
Expand Down Expand Up @@ -187,7 +185,12 @@ def report_status(self, t_offset=None):
self.report_processing_stages()

def start_workflow(self, workflow="", timeout=TIMEOUT_DEFAULT, **kwargs):
"""Kickoff a DM workflow with optional wait & timeout."""
"""
Kickoff a DM workflow with optional reporting timeout.

The reporting process will continue until the workflow ends or the
timeout period is exceeded. It does not affect the actual workflow.
"""
if workflow == "":
workflow = self.workflow.get()
else:
Expand Down Expand Up @@ -224,7 +227,11 @@ def _cleanup():

@run_in_thread
def _run_DM_workflow_thread():
logger.info("run DM workflow: %s with timeout=%s s", self.workflow.get(), timeout)
logger.info(
"run DM workflow: %s with reporting time limit=%s s",
self.workflow.get(),
timeout,
)
self.job = self.api.startProcessingJob(
workflowOwner=self.owner.get(),
workflowName=workflow,
Expand Down Expand Up @@ -266,7 +273,13 @@ def _run_DM_workflow_thread():
self.status.subscribe(_reporter)
_run_DM_workflow_thread()

def run_as_plan(self, workflow="", wait=True, timeout=TIMEOUT_DEFAULT, **kwargs):
def run_as_plan(
self,
workflow: str = "",
wait: bool = True,
timeout: int = TIMEOUT_DEFAULT,
**kwargs,
):
"""Run the DM workflow as a bluesky plan."""
from bluesky import plan_stubs as bps

Expand Down
5 changes: 1 addition & 4 deletions apstools/devices/lakeshore_controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@
from ophyd import Signal

from ..synApps import AsynRecord
from ..utils import HOUR
from . import PVPositionerSoftDoneWithStop

SECOND = 1
MINUTE = 60 * SECOND
HOUR = 60 * MINUTE


class LakeShore336_LoopControl(PVPositionerSoftDoneWithStop):
"""
Expand Down
7 changes: 3 additions & 4 deletions apstools/devices/tests/test_aps_data_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
import pytest

from .. import DM_WorkflowConnector
from ..aps_data_management import DM_STATION_NAME

# from dm.common.exceptions.dmException import DmException


@pytest.mark.parametrize("wf_name", ["a_workflow_name"])
Expand All @@ -18,7 +15,7 @@ def test_object(wf_name):
assert wf is not None

assert wf.workflow.get() == wf_name
assert wf.owner.get() == DM_STATION_NAME
assert wf.owner.get() in ("", None)

try:
# Force a test that dm package can be imported.
Expand All @@ -35,6 +32,8 @@ def test_object(wf_name):
"Connection refused" in err_str
or
"invalid literal for int() with base 10" in err_str
or
"Invalid owner name provided" in err_str
)
# fmt: on

Expand Down
48 changes: 45 additions & 3 deletions apstools/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
from ._core import TableStyle

from .aps_data_management import dm_setup

from .aps_data_management import build_run_metadata_dict
from .aps_data_management import dm_add_workflow
from .aps_data_management import dm_api_cat
from .aps_data_management import dm_api_daq
from .aps_data_management import dm_api_dataset_cat
from .aps_data_management import dm_api_ds
from .aps_data_management import dm_api_file
from .aps_data_management import dm_api_filecat
from .aps_data_management import dm_api_proc
from .aps_data_management import dm_file_ready_to_process
from .aps_data_management import dm_get_daqs
from .aps_data_management import dm_get_experiment_datadir_active_daq
from .aps_data_management import dm_get_experiment_file
from .aps_data_management import dm_get_experiment_path
from .aps_data_management import dm_get_experiments
from .aps_data_management import dm_get_workflow
from .aps_data_management import dm_source_environ
from .aps_data_management import dm_start_daq
from .aps_data_management import dm_station_name
from .aps_data_management import dm_stop_daq
from .aps_data_management import dm_update_workflow
from .aps_data_management import dm_upload
from .aps_data_management import get_workflow_last_stage
from .aps_data_management import share_bluesky_metadata_with_dm
from .aps_data_management import validate_experiment_dataDirectory
from .aps_data_management import wait_dm_upload
from .aps_data_management import DEFAULT_UPLOAD_TIMEOUT
from .aps_data_management import DEFAULT_UPLOAD_POLL_PERIOD
from .aps_data_management import DM_WorkflowCache
from .apsu_controls_subnet import warn_if_not_aps_controls_subnet

from .catalog import copy_filtered_catalog
from .catalog import findCatalogsInNamespace
from .catalog import getCatalog
Expand Down Expand Up @@ -63,6 +89,22 @@
from .spreadsheet import ExcelDatabaseFileBase
from .spreadsheet import ExcelDatabaseFileGeneric
from .spreadsheet import ExcelReadError
from .time_constants import DAY
from .time_constants import HOUR
from .time_constants import MINUTE
from .time_constants import SECOND
from .time_constants import WEEK
from .time_constants import ts2iso

# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
# :email: jemian@anl.gov
# :copyright: (c) 2017-2024, UChicago Argonne, LLC
#
# Distributed under the terms of the Argonne National Laboratory Open Source License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# :author: Pete R. Jemian
Expand Down
Loading