Skip to content

Commit

Permalink
Merge c047fc7 into cc5f4f8
Browse files Browse the repository at this point in the history
  • Loading branch information
prjemian committed Aug 5, 2019
2 parents cc5f4f8 + c047fc7 commit 594ede5
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 6 deletions.
21 changes: 18 additions & 3 deletions apstools/utils.py
Expand Up @@ -334,7 +334,7 @@ def pairwise(iterable):
return zip(a, a)


def replay(headers, callback=None):
def replay(headers, callback=None, sort=True):
"""
replay the document stream from one (or more) scans (headers)
Expand All @@ -348,7 +348,11 @@ def replay(headers, callback=None):
callback: scan or [scan]
The Bluesky callback to handle the stream of documents from a scan.
If `None`, then use the `bec` (BestEffortCallback) from the IPython shell.
(default:`None`)
(default:``None``)
sort: bool
Sort the headers chronologically if True.
(default:``True``)
*new in apstools release 1.1.11*
"""
Expand All @@ -359,7 +363,18 @@ def replay(headers, callback=None):
_headers = headers # do not mutate the input arg
if isinstance(_headers, databroker.Header):
_headers = [_headers]
for h in _headers:

def time_sorter(run): # by increasing time
return run.start["time"]

sequence = list(_headers) # for sequence_sorter
def sequence_sorter(run): # by sequence as-given
v = sequence.index(run)
return v

sorter = {True: time_sorter, False: sequence_sorter}[sort]

for h in sorted(_headers, key=sorter):
if not isinstance(h, databroker.Header):
emsg = f"Must be a databroker Header: received: {type(h)}: |{h}|"
raise TypeError(emsg)
Expand Down
33 changes: 30 additions & 3 deletions tests/test_utils.py
Expand Up @@ -210,16 +210,43 @@ def test_list_recent_scans(self):

def test_replay(self):
replies = []
def my_cb(key, doc):
def cb1(key, doc):
replies.append((key, len(doc)))
APS_utils.replay(self.db(plan_name="count"), callback=my_cb)

APS_utils.replay(self.db(plan_name="count"), callback=cb1)
self.assertGreater(len(replies), 0)
keys = set([v[0] for v in replies])
for item in "start stop event descriptor datum resource".split():
msg = f"{item} not in {keys}"
self.assertIn(item, keys, msg)

def cb2(key, doc):
if key == "start":
replies.append(doc["time"])

replies = []
APS_utils.replay(self.db[-1], callback=cb2)
self.assertEqual(len(replies), 1, "most recent run")

replies = []
APS_utils.replay(self.db[-3:], callback=cb2)
self.assertEqual(len(replies), 3, "most recent 3 runs")

replies = []
APS_utils.replay(self.db(plan_name="count"), callback=cb2)
previous = 0
for i, v in enumerate(replies):
msg = f"expect increasing: #{i} : change={v-previous}"
self.assertGreater(v - previous, 0, msg)
previous = v

replies = []
APS_utils.replay(self.db(plan_name="count"), callback=cb2, sort=False)
previous = replies[0]+1
for i, v in enumerate(replies):
msg = f"expect decreasing: #{i} : change={v-previous}"
self.assertLess(v - previous, 0, msg)
previous = v


def suite(*args, **kw):
test_list = [
Expand Down

0 comments on commit 594ede5

Please sign in to comment.