Skip to content

Commit

Permalink
Derive PandA describe from datasets PV (#369)
Browse files Browse the repository at this point in the history
* Dervice PandA describe from datasets PV

* Fix resource kwargs inconsistencies with nexus writer
  • Loading branch information
callumforrester committed Jun 17, 2024
1 parent 299b4bd commit febf9ce
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 237 deletions.
3 changes: 2 additions & 1 deletion src/ophyd_async/panda/_common_blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from enum import Enum

from ophyd_async.core import Device, DeviceVector, SignalR, SignalRW
from ophyd_async.panda._table import SeqTable
from ophyd_async.panda._table import DatasetTable, SeqTable


class DataBlock(Device):
Expand All @@ -14,6 +14,7 @@ class DataBlock(Device):
num_captured: SignalR[int]
capture: SignalRW[bool]
flush_period: SignalRW[float]
datasets: SignalR[DatasetTable]


class PulseBlock(Device):
Expand Down
2 changes: 0 additions & 2 deletions src/ophyd_async/panda/_hdf_panda.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ def __init__(
create_children_from_annotations(self)
controller = PandaPcapController(pcap=self.pcap)
writer = PandaHDFWriter(
prefix=prefix,
directory_provider=directory_provider,
name_provider=lambda: name,
panda_device=self,
)
super().__init__(
Expand Down
10 changes: 10 additions & 0 deletions src/ophyd_async/panda/_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@
import numpy.typing as npt


class PandaHdf5DatasetType(str, Enum):
FLOAT_64 = "float64"
UINT_32 = "uint32"


class DatasetTable(TypedDict):
name: npt.NDArray[np.str_]
hdf5_type: Sequence[PandaHdf5DatasetType]


class SeqTrigger(str, Enum):
IMMEDIATE = "Immediate"
BITA_0 = "BITA=0"
Expand Down
127 changes: 22 additions & 105 deletions src/ophyd_async/panda/writers/_hdf_writer.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,32 @@
import asyncio
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any, AsyncGenerator, AsyncIterator, Dict, List, Optional
from typing import AsyncGenerator, AsyncIterator, Dict, List, Optional

from bluesky.protocols import DataKey, StreamAsset
from p4p.client.thread import Context

from ophyd_async.core import (
DEFAULT_TIMEOUT,
DetectorWriter,
Device,
DirectoryProvider,
NameProvider,
SignalR,
wait_for_value,
)
from ophyd_async.core.signal import observe_value
from ophyd_async.panda import CommonPandaBlocks

from .._common_blocks import CommonPandaBlocks
from ._panda_hdf_file import _HDFDataset, _HDFFile


class Capture(str, Enum):
# Capture signals for the HDF Panda
No = "No"
Value = "Value"
Diff = "Diff"
Sum = "Sum"
Mean = "Mean"
Min = "Min"
Max = "Max"
MinMax = "Min Max"
MinMaxMean = "Min Max Mean"


def get_capture_signals(
block: Device, path_prefix: Optional[str] = ""
) -> Dict[str, SignalR]:
"""Get dict mapping a capture signal's name to the signal itself"""
if not path_prefix:
path_prefix = ""
signals: Dict[str, SignalR[Any]] = {}
for attr_name, attr in block.children():
# Capture signals end in _capture, but num_capture is a red herring
if attr_name == "num_capture":
continue
dot_path = f"{path_prefix}{attr_name}"
if isinstance(attr, SignalR) and attr_name.endswith("_capture"):
signals[dot_path] = attr
attr_signals = get_capture_signals(attr, path_prefix=dot_path + ".")
signals.update(attr_signals)
return signals


@dataclass
class CaptureSignalWrapper:
signal: SignalR
capture_type: Capture


# This should return a dictionary which contains a dict, containing the Capture
# signal object, and the value of that signal
async def get_signals_marked_for_capture(
capture_signals: Dict[str, SignalR],
) -> Dict[str, CaptureSignalWrapper]:
# Read signals to see if they should be captured
do_read = [signal.get_value() for signal in capture_signals.values()]

signal_values = await asyncio.gather(*do_read)

assert len(signal_values) == len(
capture_signals
), "Length of read signals are different to length of signals"

signals_to_capture: Dict[str, CaptureSignalWrapper] = {}
for signal_path, signal_object, signal_value in zip(
capture_signals.keys(), capture_signals.values(), signal_values
):
signal_path = signal_path.replace("_capture", "")
if (signal_value in iter(Capture)) and (signal_value != Capture.No):
signals_to_capture[signal_path] = CaptureSignalWrapper(
signal_object,
signal_value,
)

return signals_to_capture


class PandaHDFWriter(DetectorWriter):
_ctxt: Optional[Context] = None

def __init__(
self,
prefix: str,
directory_provider: DirectoryProvider,
name_provider: NameProvider,
panda_device: CommonPandaBlocks,
) -> None:
self.panda_device = panda_device
self._prefix = prefix
self._directory_provider = directory_provider
self._name_provider = name_provider
self._datasets: List[_HDFDataset] = []
self._file: Optional[_HDFFile] = None
self._multiplier = 1
Expand All @@ -110,14 +35,9 @@ def __init__(
async def open(self, multiplier: int = 1) -> Dict[str, DataKey]:
"""Retrieve and get descriptor of all PandA signals marked for capture"""

# Get capture PVs by looking at panda. Gives mapping of dotted attribute path
# to Signal object
self.capture_signals = get_capture_signals(self.panda_device)

# Ensure flushes are immediate
await self.panda_device.data.flush_period.set(0)

to_capture = await get_signals_marked_for_capture(self.capture_signals)
self._file = None
info = self._directory_provider()
# Set the initial values
Expand All @@ -133,36 +53,21 @@ async def open(self, multiplier: int = 1) -> Dict[str, DataKey]:

# Wait for it to start, stashing the status that tells us when it finishes
await self.panda_device.data.capture.set(True)
name = self._name_provider()
if multiplier > 1:
raise ValueError(
"All PandA datasets should be scalar, multiplier should be 1"
)
self._datasets = []
for attribute_path, capture_signal in to_capture.items():
split_path = attribute_path.split(".")
signal_name = split_path[-1]
# Get block names from numbered blocks, eg INENC[1]
block_name = (
f"{split_path[-3]}{split_path[-2]}"
if split_path[-2].isnumeric()
else split_path[-2]
)

for suffix in capture_signal.capture_type.split(" "):
self._datasets.append(
_HDFDataset(
name,
block_name,
f"{name}-{block_name}-{signal_name}-{suffix}",
f"{block_name}-{signal_name}".upper() + f"-{suffix}",
[1],
multiplier=1,
)
)
return await self._describe()

async def _describe(self) -> Dict[str, DataKey]:
"""
Return a describe based on the datasets PV
"""

await self._update_datasets()
describe = {
ds.name: DataKey(
ds.data_key: DataKey(
source=self.panda_device.data.hdf_directory.source,
shape=ds.shape,
dtype="array" if ds.shape != [1] else "number",
Expand All @@ -172,6 +77,18 @@ async def open(self, multiplier: int = 1) -> Dict[str, DataKey]:
}
return describe

async def _update_datasets(self) -> None:
"""
Load data from the datasets PV on the panda, update internal
representation of datasets that the panda will write.
"""

capture_table = await self.panda_device.data.datasets.get_value()
self._datasets = [
_HDFDataset(dataset_name, "/" + dataset_name, [1], multiplier=1)
for dataset_name in capture_table["name"]
]

# Next few functions are exactly the same as AD writer. Could move as default
# StandardDetector behavior
async def wait_for_index(
Expand Down
12 changes: 4 additions & 8 deletions src/ophyd_async/panda/writers/_panda_hdf_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@

@dataclass
class _HDFDataset:
device_name: str
block: str
name: str
path: str
data_key: str
internal_path: str
shape: List[int]
multiplier: int

Expand All @@ -29,12 +27,10 @@ def __init__(
compose_stream_resource(
spec="AD_HDF5_SWMR_SLICE",
root=str(directory_info.root),
data_key=ds.name,
data_key=ds.data_key,
resource_path=(f"{str(directory_info.root)}/{full_file_name}"),
resource_kwargs={
"name": ds.name,
"block": ds.block,
"path": ds.path,
"path": ds.internal_path,
"multiplier": ds.multiplier,
"timestamps": "/entry/instrument/NDAttributes/NDArrayTimeStamp",
},
Expand Down
46 changes: 22 additions & 24 deletions tests/panda/test_hdf_panda.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Dict

import numpy as np
import pytest
from bluesky import plan_stubs as bps
from bluesky.run_engine import RunEngine
Expand All @@ -9,9 +10,8 @@
from ophyd_async.core.flyer import HardwareTriggeredFlyable
from ophyd_async.core.mock_signal_utils import callback_on_mock_put
from ophyd_async.core.signal import SignalR, assert_emitted
from ophyd_async.epics.signal.signal import epics_signal_r
from ophyd_async.panda import HDFPanda, StaticSeqTableTriggerLogic
from ophyd_async.panda.writers._hdf_writer import Capture
from ophyd_async.panda._table import DatasetTable, PandaHdf5DatasetType
from ophyd_async.plan_stubs import (
prepare_static_seq_table_flyer_and_detectors_with_same_trigger,
)
Expand All @@ -26,25 +26,28 @@ class CaptureBlock(Device):
mock_hdf_panda = HDFPanda(
"HDFPANDA:", directory_provider=directory_provider, name="panda"
)
block_a = CaptureBlock(name="block_a")
block_b = CaptureBlock(name="block_b")
block_a.test_capture = epics_signal_r(
Capture, "pva://test_capture_a", name="test_capture_a"
)
block_b.test_capture = epics_signal_r(
Capture, "pva://test_capture_b", name="test_capture_b"
)

setattr(mock_hdf_panda, "block_a", block_a)
setattr(mock_hdf_panda, "block_b", block_b)
await mock_hdf_panda.connect(mock=True)

def link_function(value, **kwargs):
set_mock_value(mock_hdf_panda.pcap.active, value)

callback_on_mock_put(mock_hdf_panda.pcap.arm, link_function)
set_mock_value(block_a.test_capture, Capture.Min)
set_mock_value(block_b.test_capture, Capture.Diff)

set_mock_value(
mock_hdf_panda.data.datasets,
DatasetTable(
name=np.array(
[
"x",
"y",
]
),
hdf5_type=[
PandaHdf5DatasetType.UINT_32,
PandaHdf5DatasetType.FLOAT_64,
],
),
)

yield mock_hdf_panda

Expand Down Expand Up @@ -127,28 +130,23 @@ def flying_plan():

# test descriptor
data_key_names: Dict[str, str] = docs["descriptor"][0]["object_keys"]["panda"]
assert data_key_names == [
"panda-block_a-test-Min",
"panda-block_b-test-Diff",
]
assert data_key_names == ["x", "y"]
for data_key_name in data_key_names:
assert (
docs["descriptor"][0]["data_keys"][data_key_name]["source"]
== "mock+soft://panda-data-hdf_directory"
)

# test stream resources
for block_letter, stream_resource, data_key_name in zip(
("a", "b"), docs["stream_resource"], data_key_names
for dataset_name, stream_resource, data_key_name in zip(
("x", "y"), docs["stream_resource"], data_key_names
):
assert stream_resource["data_key"] == data_key_name
assert stream_resource["spec"] == "AD_HDF5_SWMR_SLICE"
assert stream_resource["run_start"] == docs["start"][0]["uid"]
assert stream_resource["resource_kwargs"] == {
"block": f"block_{block_letter}",
"path": "/" + dataset_name,
"multiplier": 1,
"name": data_key_name,
"path": f"BLOCK_{block_letter.upper()}-TEST-{data_key_name.split('-')[-1]}",
"timestamps": "/entry/instrument/NDAttributes/NDArrayTimeStamp",
}

Expand Down
Loading

0 comments on commit febf9ce

Please sign in to comment.