Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SpecWriterCallback: write #O & #P lines (motor positions) #798

Merged
merged 14 commits into from
Feb 14, 2023
84 changes: 61 additions & 23 deletions apstools/callbacks/spec_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def myPlan():
import datetime
import getpass
import logging
import os
import pathlib
import socket
import time
from collections import OrderedDict
Expand Down Expand Up @@ -208,27 +208,31 @@ class SpecWriterCallback(object):
def __init__(self, filename=None, auto_write=True, RE=None, reset_scan_id=False):
self.clear()
self.buffered_comments = self._empty_comments_dict()
self.spec_filename = filename
self.auto_write = auto_write
self.uid_short_length = 8
self.write_file_header = False
self.write_new_header = False
self.spec_epoch = None # for both #E & #D line in header, also offset for all scans
self.spec_host = None
self.spec_user = None
self._datetime = None # most recent document time
self._motor_stream_name = "label_start_motor"
self._header_motor_keys = None
self._streams = {} # descriptor documents, keyed by uid
self.RE = RE

if reset_scan_id is True:
reset_scan_id = SCAN_ID_RESET_VALUE
self.reset_scan_id = reset_scan_id

if filename is None or not os.path.exists(filename):
self.newfile(filename)
if isinstance(filename, str):
filename = pathlib.Path(filename)
if filename is None or not filename.exists():
filename = self.newfile(filename)
else:
max_scan_id = self.usefile(filename)
if RE is not None and reset_scan_id is not False:
RE.md["scan_id"] = max_scan_id
self.spec_filename = filename

def clear(self):
"""reset all scan data defaults"""
Expand Down Expand Up @@ -330,7 +334,7 @@ def start(self, doc):
# Can this be omitted?
self.T_or_M = None # for now
# self.T_or_M = "T" # TODO: how to get this from the document stream?
# self.T_or_M_value = 1
self.T_or_M_value = 1
# self._cmt("start", "!!! #T line not correct yet !!!")

# metadata
Expand Down Expand Up @@ -366,6 +370,13 @@ def descriptor(self, doc):
# referenced by event and bulk_events documents
self._streams[doc["uid"]] = doc

if doc["name"] == self._motor_stream_name:
# list of all known positioners (motors)
mlist = sorted(doc["object_keys"].keys())
if self._header_motor_keys != mlist:
self.write_new_header = True
self.positioners = {k: None for k in mlist}
return
if doc["name"] != "primary":
return

Expand Down Expand Up @@ -395,11 +406,14 @@ def event(self, doc):
"""
handle *event* documents
"""
stream_doc = self._streams.get(doc["descriptor"])
if stream_doc is None:
descriptor = self._streams.get(doc["descriptor"])
if descriptor is None:
fmt = "descriptor UID {} not found"
raise KeyError(fmt.format(doc["descriptor"]))
if stream_doc["name"] == "primary":
if descriptor["name"] == self._motor_stream_name:
for k in self.positioners.keys():
self.positioners[k] = doc["data"][k] # get motor values
elif descriptor["name"] == "primary":
for k in doc["data"].keys():
if k not in self.data.keys():
msg = f"unexpected failure here, key {k} not found"
Expand Down Expand Up @@ -466,7 +480,17 @@ def prepare_scan_contents(self):
# "#MD" is our ad hoc SPEC data tag
lines.append(f"#MD {k} = {v}")

lines.append("#P0 ")
if sorted(self.positioners.keys()) != self._header_motor_keys:
self.write_new_header = True
if len(self.positioners) == 0:
lines.append("#P0 ")
else:
values = list(self.positioners.values())
r = 0
while len(values) > 0:
lines.append(f"#P{r} " + " ".join([str(v) for v in values[:8]]))
values = values[8:]
r += 1

lines.append("#N " + str(len(self.data.keys())))
if len(self.data.keys()) > 0:
Expand Down Expand Up @@ -510,21 +534,34 @@ def _write_lines_(self, lines, mode="a"):
f.write("\n".join(lines))

def write_header(self):
"""write the header section of a SPEC data file"""
"""Write the (initial) header section of a SPEC data file."""
dt = datetime.datetime.fromtimestamp(self.spec_epoch)
lines = []
# Ok to repeat #F in addtional header sections
lines.append(f"#F {self.spec_filename}")
lines.append(f"#E {self.spec_epoch}")
lines.append(f"#D {datetime.datetime.strftime(dt, SPEC_TIME_FORMAT)}")
lines.append(f"#C Bluesky user = {self.spec_user} host = {self.spec_host}")
lines.append("#O0 ")
lines.append("#o0 ")
self._header_motor_keys = sorted(self.positioners.keys())
if len(self._header_motor_keys) == 0:
lines.append("#O0 ") # names
lines.append("#o0 ") # mnemonics
else:
delimiter = " " * 2 # two spaces between names
for pre in "#O #o".split(): # same list for names and mnemonics
values = self._header_motor_keys
r = 0
while len(values) > 0:
lines.append(f"{pre}{r} " + delimiter.join([str(v) for v in values[:8]]))
values = values[8:]
r += 1

lines.append("")

if os.path.exists(self.spec_filename):
if self.spec_filename.exists():
lines.insert(0, "")
self._write_lines_(lines, mode="a+")
self.write_file_header = False
self.write_new_header = False

def write_scan(self):
"""
Expand All @@ -536,7 +573,7 @@ def write_scan(self):

note: does nothing if there are no lines to be written
"""
if os.path.exists(self.spec_filename):
if self.spec_filename.exists():
with open(self.spec_filename) as f:
buf = f.read()
if buf.find(self.uid) >= 0:
Expand All @@ -547,7 +584,7 @@ def write_scan(self):
lines = self.prepare_scan_contents()
lines.append("")
if lines is not None:
if self.write_file_header:
if self.write_new_header:
self.write_header()
logger.info("wrote header to SPEC file: %s", self.spec_filename)
self._write_lines_(lines, mode="a")
Expand All @@ -560,7 +597,8 @@ def write_scan(self):
def make_default_filename(self):
"""generate a file name to be used as default"""
now = datetime.datetime.now()
return datetime.datetime.strftime(now, "%Y%m%d-%H%M%S") + ".dat"
filename = datetime.datetime.strftime(now, "%Y%m%d-%H%M%S") + ".dat"
return pathlib.Path(filename)

def newfile(self, filename=None, scan_id=None, RE=None):
"""
Expand All @@ -570,7 +608,7 @@ def newfile(self, filename=None, scan_id=None, RE=None):
"""
self.clear()
filename = filename or self.make_default_filename()
if os.path.exists(filename):
if filename.exists():
from spec2nexus.spec import SpecDataFile

sdf = SpecDataFile(filename)
Expand All @@ -583,7 +621,7 @@ def newfile(self, filename=None, scan_id=None, RE=None):
self.spec_epoch = int(time.time()) # ! no roundup here!!!
self.spec_host = socket.gethostname() or "localhost"
self.spec_user = getpass.getuser() or "BlueskyUser"
self.write_file_header = True # don't write the file yet
self.write_new_header = True # don't write the file yet

# backwards-compatibility
if isinstance(scan_id, bool):
Expand All @@ -599,7 +637,7 @@ def newfile(self, filename=None, scan_id=None, RE=None):

def usefile(self, filename):
"""read from existing SPEC data file"""
if not os.path.exists(self.spec_filename):
if not self.spec_filename.exists():
raise IOError(f"file {filename} does not exist")
scan_id = None
with open(filename, "r") as f:
Expand Down Expand Up @@ -667,8 +705,8 @@ def spec_comment(comment, doc=None, writer=None):
Instance of ``SpecWriterCallback()``,
typically: ``specwriter = SpecWriterCallback()``
"""
global specwriter # such as: specwriter = SpecWriterCallback()
writer = writer or specwriter # FIXME: get from namespace
# global specwriter # such as: specwriter = SpecWriterCallback()
# writer = writer or specwriter # FIXME: get from namespace
if doc is None:
if writer.scanning:
doc = "event"
Expand Down
124 changes: 85 additions & 39 deletions apstools/callbacks/tests/test_440_specwriter.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,102 @@
"""
test issue #440: specwriter

# https://github.com/BCDA-APS/apstools/issues/440
"""

import os
import pathlib
import tempfile
import zipfile

import intake
import numpy as np
import pytest
import yaml

from .. import SpecWriterCallback
from ..spec_file_writer import _rebuild_scan_command

DATA_ARCHIVE = "440_specwriter_problem_run.zip"

PATH = os.path.dirname(__file__)
FULL_ZIP_FILE = os.path.join(PATH, DATA_ARCHIVE)

TMP_CATALOG = os.path.join(
"/tmp", DATA_ARCHIVE.split(".")[0], "catalog.yml"
)


def test_setup_comes_first():
assert os.path.exists(FULL_ZIP_FILE)

with zipfile.ZipFile(FULL_ZIP_FILE, "r") as zip_ref:
zip_ref.extractall("/tmp")
CAT_NAME = "packed_catalog"
DATA_ARCHIVE = "440_specwriter_problem_run"
FULL_ZIP_FILE = pathlib.Path(__file__).parent / f"{DATA_ARCHIVE}.zip"
UID = "624e776a-a914-4a74-8841-babf1591fb29"

assert os.path.exists(TMP_CATALOG)

@pytest.fixture(scope="function")
def tempdir():
tempdir = tempfile.mkdtemp()
yield pathlib.Path(tempdir)

def test_confirm_run_exists():
assert os.path.exists(TMP_CATALOG)

cat = intake.open_catalog(TMP_CATALOG)
assert "packed_catalog" in cat
@pytest.fixture(scope="function")
def catalog():
assert FULL_ZIP_FILE.exists()

cat = cat["packed_catalog"]
assert len(cat) == 1
assert "624e776a-a914-4a74-8841-babf1591fb29" in cat
tempdir = pathlib.Path(tempfile.mkdtemp())
assert tempdir.exists()


def test_specwriter():
# The problem does not appear when using data from the databroker.
# Verify that is the case now.
os.chdir("/tmp")
specfile = "issue240.spec"
if os.path.exists(specfile):
os.remove(specfile)
with zipfile.ZipFile(FULL_ZIP_FILE, "r") as zip_ref:
zip_ref.extractall(str(tempdir))

catalog = tempdir / DATA_ARCHIVE / "catalog.yml"
cat_dir = catalog.parent
assert cat_dir.exists(), f"{catalog.parent=}"
assert catalog.exists(), f"{catalog=}"
assert (cat_dir / "documents_manifest.txt").exists()
assert (cat_dir / "documents").exists()
assert (cat_dir / "documents" / f"{UID}.msgpack").exists()

# edit the catalog file for the installed path
details = yaml.load(open(catalog, "r").read(), Loader=yaml.Loader)
args = details["sources"][CAT_NAME]["args"]
args["paths"] = [args["paths"][0].replace("/tmp/", f"{tempdir}/")]
with open(catalog, "w") as f:
f.write(yaml.dump(details, indent=2))

yield catalog


def test_confirm_run_exists(catalog):
cat = intake.open_catalog(catalog)
assert len(cat) > 0, f"{catalog=} {cat=}"
assert CAT_NAME in cat, f"{catalog=} {cat=}"
assert isinstance(cat, intake.Catalog), f"{type(cat)=} {dir(cat)=}"
assert isinstance(cat, intake.catalog.Catalog), f"{type(cat)=} {dir(cat)=}"
assert isinstance(cat, intake.catalog.local.Catalog), f"{type(cat)=} {dir(cat)=}"
assert isinstance(cat, intake.catalog.local.YAMLFileCatalog), f"{type(cat)=} {dir(cat)=}"
assert cat.name == DATA_ARCHIVE

cat = cat[CAT_NAME]
assert isinstance(cat, intake.Catalog), f"{type(cat)=} {dir(cat)=}"
assert isinstance(cat, intake.catalog.Catalog), f"{type(cat)=} {dir(cat)=}"
assert isinstance(cat, intake.catalog.local.Catalog), f"{type(cat)=} {dir(cat)=}"
assert not isinstance(cat, intake.catalog.local.YAMLFileCatalog), f"{type(cat)=} {dir(cat)=}"
assert "msgpack.BlueskyMsgpackCatalog" in str(type(cat))
assert cat.name == CAT_NAME
assert len(cat) > 0, f"{catalog=} {cat=} {dir(cat)=}"

# finally, confirm the expected run exists
assert UID in cat, f"{catalog=} {cat=} {dir(cat)=}"


def test_specwriter_replay(tempdir, catalog):
# https://github.com/BCDA-APS/apstools/issues/440
# The #440 problem does not appear when using data from the databroker.
# This test will verify that is still the case now.
pathlib.os.chdir(tempdir)
specfile = pathlib.Path("issue240.spec")
if specfile.exists():
specfile.unlink() # remove existing file
specwriter = SpecWriterCallback()
specwriter.newfile(specfile)
db = intake.open_catalog(TMP_CATALOG)["packed_catalog"].v1
h = db[-1]
for key, doc in db.get_documents(h):

cat = intake.open_catalog(catalog)[CAT_NAME].v2
assert len(cat) > 0, f"{catalog=} {cat=}"
h = cat.v1[-1]
for key, doc in cat.v1.get_documents(h):
specwriter.receiver(key, doc)
assert "relative_energy" not in doc
assert os.path.exists(specwriter.spec_filename)
assert specwriter.spec_filename.exists()

with open(specwriter.spec_filename, "r") as f:
line = ""
Expand All @@ -68,14 +109,19 @@ def test_specwriter():
line = f.readline()
assert line.startswith("#D ")

# The problem comes up if one of the arguments is a numpy.array.

def test_specwriter_numpy_array(tempdir, catalog):
# The #440 problem comes up if one of the arguments is a numpy.array.
# So we must replay the document stream and modify the right
# structure as it passes by.
# This structure is in the start document, which is first.
# Note: we don't have to write the whole SPEC file again,
# just test if _rebuild_scan_command(start_doc) is one line.

hh = db.get_documents(h)
cat = intake.open_catalog(catalog)[CAT_NAME].v2
assert len(cat) > 0, f"{catalog=} {cat=}"

hh = cat.v1.get_documents(cat.v1[-1])
key, doc = next(hh)
arr = doc["plan_args"]["qx_setup"]["relative_energy"]
assert isinstance(arr, list)
Expand All @@ -90,4 +136,4 @@ def test_specwriter():
# modify the start doc
doc["plan_args"]["qx_setup"]["relative_energy"] = arr
cmd = _rebuild_scan_command(doc) # FIXME: <-----
assert len(cmd.strip().splitlines()) == 1
assert len(cmd.strip().splitlines()) == 1, f"{cmd=}"
Loading