Skip to content


Merge b086861 into 0769f87
Browse files Browse the repository at this point in the history
  • Loading branch information
MDecarabas committed Apr 8, 2024
2 parents 0769f87 + b086861 commit 608f244
Showing 1 changed file with 237 additions and 0 deletions.
237 changes: 237 additions & 0 deletions apstools/plans/
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
from bluesky import plan_stubs as bps
from bluesky import utils
from bluesky import plan_patterns
from bluesky import preprocessors as bpp

import inspect
from itertools import zip_longest
from collections import defaultdict
import os

# cytools is a drop-in replacement for toolz, implemented in Cython
from cytools import partition
except ImportError:
from toolz import partition

def mesh_list_grid_scan(detectors, *args, number_of_collection_points, snake_axes=False, per_step=None, md=None):
Scan over a mesh; each motor is on an independent trajectory.
detectors: list
list of 'readable' objects
args: list
patterned like (``motor1, position_list1,``
``motor2, position_list2,``
``motor3, position_list3,``
``motorN, position_listN``)
The first motor is the "slowest", the outer loop. ``position_list``'s
are lists of positions, all lists must have the same length. Motors
can be any 'settable' object (motor, temp controller, etc.).
number_of_collection_points: int
The total number of collection points that must be collected within the
grid until the scan is ready to stop.
snake_axes: boolean or iterable, optional
which axes should be snaked, either ``False`` (do not snake any axes),
``True`` (snake all axes) or a list of axes to snake. "Snaking" an axis
is defined as following snake-like, winding trajectory instead of a
simple left-to-right trajectory.The elements of the list are motors
that are listed in `args`. The list must not contain the slowest
(first) motor, since it can't be snaked.
per_step: callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md: dict, optional
See Also

full_cycler = plan_patterns.outer_list_product(args, snake_axes)

md_args = []
motor_names = []
motors = []
for i, (motor, pos_list) in enumerate(partition(2, args)):
md_args.extend([repr(motor), pos_list])
# print("when")
_md = {
"shape": tuple(len(pos_list) for motor, pos_list in partition(2, args)),
"extents": tuple([min(pos_list), max(pos_list)] for motor, pos_list in partition(2, args)),
"snake_axes": snake_axes,
"plan_args": {"detectors": list(map(repr, detectors)), "args": md_args, "per_step": repr(per_step)},
"plan_name": "list_grid_scan",
"plan_pattern": "outer_list_product",
"plan_pattern_args": dict(args=md_args, snake_axes=snake_axes),
"plan_pattern_module": plan_patterns.__name__,
"motors": tuple(motor_names),
"hints": {},
_md.update(md or {})
_md["hints"].setdefault("dimensions", [(m.hints["fields"], "primary") for m in motors])
except (AttributeError, KeyError):

return (
yield from mesh_scan_nd(detectors, full_cycler, number_of_collection_points, per_step=per_step, md=_md)

def mesh_scan_nd(detectors, cycler, number_of_collection_points, *, per_step=None, md=None):
Scan over an arbitrary N-dimensional trajectory.
detectors : list
cycler : Cycler
cycler.Cycler object mapping movable interfaces to positions
number_of_collection_points: int
The total number of collection points that must be collected within the
grid until the scan is ready to stop.
per_step : callable, optional
hook for customizing action of inner loop (messages per step).
See docstring of :func:`bluesky.plan_stubs.one_nd_step` (the default)
for details.
md : dict, optional
See Also
>>> from cycler import cycler
>>> cy = cycler(motor1, [1, 2, 3]) * cycler(motor2, [4, 5, 6])
>>> scan_nd([sensor], cy)
_md = {
"detectors": [ for det in detectors],
"motors": [ for motor in cycler.keys],
"num_points": len(cycler),
"num_intervals": len(cycler) - 1,
"plan_args": {"detectors": list(map(repr, detectors)), "cycler": repr(cycler), "per_step": repr(per_step)},
"plan_name": "scan_nd",
"hints": {},
_md.update(md or {})
dimensions = [(motor.hints["fields"], "primary") for motor in cycler.keys]
except (AttributeError, KeyError):
# Not all motors provide a 'fields' hint, so we have to skip it.
# We know that hints exists. Either:
# - the user passed it in and we are extending it
# - the user did not pass it in and we got the default {}
# If the user supplied hints includes a dimension entry, do not
# change it, else set it to the one generated above
_md["hints"].setdefault("dimensions", dimensions)

predeclare = per_step is None and os.environ.get("BLUESKY_PREDECLARE", False)
if per_step is None:
per_step = bps.one_nd_step
# Ensure that the user-defined per-step has the expected signature.
sig = inspect.signature(per_step)

def _verify_1d_step(sig):
if len(sig.parameters) < 3:
return False
for name, (p_name, p) in zip_longest(["detectors", "motor", "step"], sig.parameters.items()):
# this is one of the first 3 positional arguements, check that the name matches
if name is not None:
if name != p_name:
return False
# if there are any extra arguments, check that they have a default
if p.kind is p.VAR_KEYWORD or p.kind is p.VAR_POSITIONAL:
if p.default is p.empty:
return False

return True

def _verify_nd_step(sig):
if len(sig.parameters) < 3:
return False
for name, (p_name, p) in zip_longest(["detectors", "step", "pos_cache"], sig.parameters.items()):
# this is one of the first 3 positional arguements, check that the name matches
if name is not None:
if name != p_name:
return False
# if there are any extra arguments, check that they have a default
if p.kind is p.VAR_KEYWORD or p.kind is p.VAR_POSITIONAL:
if p.default is p.empty:
return False

return True

if sig == inspect.signature(bps.one_nd_step):
elif _verify_nd_step(sig):
# check other signature for back-compatibility
elif _verify_1d_step(sig):
# Accept this signature for back-compat reasons (because
# inner_product_scan was renamed scan).
dims = len(list(cycler.keys))
if dims != 1:
raise TypeError(
"Signature of per_step assumes 1D trajectory " "but {} motors are specified.".format(dims)
(motor,) = cycler.keys
user_per_step = per_step

def adapter(detectors, step, pos_cache):
# one_nd_step 'step' parameter is a dict; one_id_step 'step'
# parameter is a value
(step,) = step.values()
return (yield from user_per_step(detectors, motor, step))

per_step = adapter
raise TypeError(
"per_step must be a callable with the signature \n "
"<Signature (detectors, step, pos_cache)> or "
"<Signature (detectors, motor, step)>. \n"
"per_step signature received: {}".format(sig)
pos_cache = defaultdict(lambda: None) # where last position is stashed
cycler = utils.merge_cycler(cycler)
motors = list(cycler.keys)

@bpp.stage_decorator(list(detectors) + motors)
def scan_until_completion():
Scanning until the total number of required collection points is achieved
if predeclare:
yield from bps.declare_stream(*motors, *detectors, name="primary")
iterations = 0
while iterations < number_of_collection_points:
for step in list(cycler):
yield from per_step(detectors, step, pos_cache)
iterations += 1
if iterations == number_of_collection_points:

return (yield from scan_until_completion())

0 comments on commit 608f244

Please sign in to comment.