# sscan as Flyer

## 1D step scans using sscan record

Use the [sscan record](https://epics.anl.gov/bcda/synApps/sscan/sscanRecord.html) as an [ophyd Flyer](http://nsls-ii.github.io/bluesky/async.html) for [bluesky](http://nsls-ii.github.io/bluesky) data acquisition.  Consider the case of [1D step scans using sscan record](https://epics.anl.gov/bcda/synApps/sscan/sscanRecord.html#HEADING_1-1).

In [1]:
import asyncio
from collections import deque, OrderedDict
import numpy as np
import time

%matplotlib notebook
from bluesky.utils import install_qt_kicker
install_qt_kicker()

# common IOC prefix to be used
P = "prj:"

In [2]:
from ophyd.scaler import ScalerCH
scaler = ScalerCH(f"{P}scaler1", name="scaler")
scaler.select_channels(None)

In [3]:
from ophyd import EpicsMotor
m1 = EpicsMotor(f"{P}m1", name="m1")

In [4]:
from apstools.synApps_ophyd import userCalcsDevice
calcs = userCalcsDevice(P, name="calcs")

In [5]:
from apstools.synApps_ophyd import sscanDevice
scans = sscanDevice(P, name="scans")
scans.select_channels()

In [6]:
from apstools.synApps_ophyd import SaveData
save_data = SaveData(f"{P}saveData_", name="save_data")

In [7]:
# configure saveData for data collection into MDA files:
        
save_data.file_system.put("/tmp")
save_data.subdirectory.put("saveData")
save_data.base_name.put("sscan1_")
save_data.next_scan_number.put(1)
save_data.comment1.put("testing")
save_data.comment2.put("configured and run from ophyd")

In [8]:
# configure the sscan record for data collection:

# clear out the weeds
scans.reset()

scan = scans.scan1
scan.number_points.put(6)
scan.positioners.p1.setpoint_pv.put(m1.user_setpoint.pvname)
scan.positioners.p1.readback_pv.put(m1.user_readback.pvname)
scan.positioners.p1.start.put(-1)
scan.positioners.p1.end.put(0)
scan.positioner_delay.put(0.0)
scan.detector_delay.put(0.1)
scan.detectors.d01.input_pv.put(scaler.channels.chan03.s.pvname)
scan.detectors.d02.input_pv.put(scaler.channels.chan02.s.pvname)
scan.triggers.t1.trigger_pv.put(scaler.count.pvname)

# finally, reconfigure
scans.select_channels()

In [9]:
# make a noisy detector in an EPICS swait record, peak ceneter at 2
from apstools.synApps_ophyd import swait_setup_lorentzian
swait_setup_lorentzian(calcs.calc2, m1, 2)
noisy_det = calcs.calc2.val
noisy_det.kind = "hinted"

In [10]:
def ophyd_step_scan(motor):
    """step-scan the motor and read the noisy detector"""
    t0 = time.time()
    for p in range(10):
        motor.move(p-3)
        print(
            "%8.3f" % (time.time()-t0), 
            "%8.2f" % motor.position, 
            "%8.4f" % noisy_det.get()
             )
    motor.move(0)
    print("Complete in %.3f seconds" % (time.time()-t0))

# ophyd_step_scan(m1)

--------
## setup Bluesky, databroker, and the RunEngine

In [11]:
from databroker import Broker
db = Broker.named("mongodb_config")

In [12]:
from bluesky import RunEngine
import bluesky.plans as bp
from bluesky.callbacks.best_effort import BestEffortCallback
from bluesky import SupplementalData

RE = RunEngine({})
RE.subscribe(db.insert)
RE.subscribe(BestEffortCallback())
RE.preprocessors.append(SupplementalData())

simple step scan using bluesky plan

In [13]:
# RE(bp.scan([noisy_det], m1, -5, 5, 11))

-------------------------

# Start to build our own Flyer

In [14]:
import ophyd
import bluesky
import threading
import logging

logger = logging.getLogger()

class MyFlyer(ophyd.Device):
    """
    starting template for a Flyer that we understand
    """

    def __init__(self, sscan, motor, detectors, triggers, first, last, npts, **kwargs):
        super().__init__('', parent=None, **kwargs)

        self.sscan = sscan
        self.motor = motor
        self.detectors = detectors
        self.triggers = triggers
        self.first = first
        self.last = last
        self.num_points = npts
        self.positioner_delay = 0.0
        self.detector_delay = 0.1
        self.scan_data_objects = None
        self.last_point = None
        self.data_buffer = None

        self.complete_status = None
        self.t0 = 0
        
        self.sscan_init()

    def sscan_init(self):
        """
        init the sscan record
        
        It's OK to use blocking calls here 
        since this is called in a separate thread
        from the BlueSky RunEngine.
        """
        logger.info("sscan_init()")

        scan = self.sscan
        scan.reset()

        scan.positioners.p1.setpoint_pv.put(self.motor.user_setpoint.pvname)
        scan.positioners.p1.readback_pv.put(self.motor.user_readback.pvname)

        scan.detector_delay.put(self.detector_delay)
        for i, d in enumerate(self.detectors):
            getattr(scan.detectors, "d%02d" % (i+1)).input_pv.put(d.pvname)

        for i, t in enumerate(self.triggers):
            getattr(scan.triggers, "t%d" % (i+1)).trigger_pv.put(t.pvname)
        
        # reconfigure the ophyd object for only the channels in use
        scan.select_channels()

        # do this now so describe_collect() and collect() can be tested
        self.scan_data_objects = self.identify_data_objects()

    def sscan_prep(self):
        """
        prep the sscan record
        
        It's OK to use blocking calls here 
        since this is called in a separate thread
        from the BlueSky RunEngine.
        """
        logger.info("sscan_prep()")

        scan = self.sscan

        scan.positioner_delay.put(self.positioner_delay)
        scan.positioners.p1.start.put(self.first)
        scan.positioners.p1.end.put(self.last)
        scan.number_points.put(self.num_points)
        
        self.data_buffer = []

    def identify_data_objects(self):
        """
        prepare a dictionary of the "interesting" ophyd data objects for this scan
        """
        scan = self.sscan
        scan_data_objects = OrderedDict()
        for part in (scan.positioners, scan.detectors):
            for chname in part.read_attrs:
                if not chname.endswith("_value"):
                    continue
                obj = getattr(part, chname)
                key = obj.name.lstrip(scan.name + "_")
                scan_data_objects[key] = obj
        return scan_data_objects

    def my_activity(self):
        """
        start the "fly scan" here, could wait for completion
        
        It's OK to use blocking calls here 
        since this is called in a separate thread
        from the BlueSky RunEngine.
        """
        logger.info("my_activity()")
        if self.complete_status is None:
            logger.info("leaving activity() - not complete")
            return
        
        # do the activity here
        self.sscan_prep()
        
        # finally, start the sscan record
        scan.execute_scan.put(1)
        
        self.last_point = scan.current_point.get()
        
        # once started, we notify by updating the status object
        self.kickoff_status._finished(success=True)

    def kickoff(self):
        """
        Start this Flyer
        """
        logger.info("kickoff()")
        self.kickoff_status = ophyd.DeviceStatus(self)
        self.complete_status = ophyd.DeviceStatus(self)
        
        thread = threading.Thread(target=self.my_activity, daemon=True)

        self.t0 = time.time()
        thread.start()
        return self.kickoff_status
    
    def get_event_data(self):
        """get the data for one collect event"""
        t = time.time()
        elapsed = t - self.t0
        print(f"collecting: {elapsed}")

        dstream = OrderedDict()
        tstream = OrderedDict()
        for key, item in self.scan_data_objects.items():
            dd = list(item.read().values())[0]
            dstream[key] = dd["value"]
            tstream[key] = dd["timestamp"]

        event = dict(
            time=t,
            data=dstream,
            timestamps=tstream
        )
        return event

    def complete(self):
        """
        Wait for flying to be complete
        """
        logger.info("complete()")
        if self.complete_status is None:
            # raise RuntimeError("No collection in progress")
            logger.info("No collection in progress")
        else:
            # wait/watch for completion
            # buffer any data events along the way
            while True:
                current_point = scan.current_point.get()
                if self.last_point != current_point and current_point > 0:
                    self.data_buffer.append(self.get_event_data())
                self.last_point = current_point
                # data acquisition steps come *before* the exit
                if scan.scan_phase.get() in (0, "IDLE"):
                    break
                time.sleep(0.001) 
            self.complete_status._finished(success=True)

        return self.complete_status

    def describe_collect(self):
        """
        Describe details for ``collect()`` method
        """
        logger.info("describe_collect()")

        stream = OrderedDict()
        for key, item in self.scan_data_objects.items():
            stream[key] = dict(
                source = item.name,
                dtype = "number",
                shape = (1,)
            )
        return {"primary" : stream}

    def collect(self):
        """
        Get data from this Flyer (after scan is complete)
        
        yield events for each row in the data
        """
        logger.info("collect()")
        scan = self.sscan
        print("collect() started")
        
        if self.complete_status is None:
            logger.info(f"complete, scan was not run - must be testing phase")
            yield self.get_event_data()
        else:
            for ev in self.data_buffer:    # report any data that was collected
                yield ev
    
            logger.info(f"activity() complete. status = {self.complete_status}")

In [15]:
ifly = MyFlyer(
    scans.scan1, 
    m1, 
    [scaler.channels.chan03.s, scaler.channels.chan02.s, calcs.calc2.val], 
    [scaler.count, calcs.calc2.process], 
    -3, 
    6, 
    6, 
    name="ifly")

In [16]:
# ifly.kickoff()

In [17]:
ifly.complete()

In [18]:
ifly.describe_collect()

{'primary': OrderedDict([('positioners_p1_readback_value',
               {'source': 'scans_scan1_positioners_p1_readback_value',
                'dtype': 'number',
                'shape': (1,)}),
              ('positioners_p1_setpoint_value',
               {'source': 'scans_scan1_positioners_p1_setpoint_value',
                'dtype': 'number',
                'shape': (1,)}),
              ('detectors_d01_current_value',
               {'source': 'scans_scan1_detectors_d01_current_value',
                'dtype': 'number',
                'shape': (1,)}),
              ('detectors_d02_current_value',
               {'source': 'scans_scan1_detectors_d02_current_value',
                'dtype': 'number',
                'shape': (1,)}),
              ('detectors_d03_current_value',
               {'source': 'scans_scan1_detectors_d03_current_value',
                'dtype': 'number',
                'shape': (1,)})])}

In [19]:
ifly.collect()

<generator object MyFlyer.collect at 0x7f238b073f10>

In [20]:
g = _
print(list(g))

collect() started
collecting: 1552331373.1976223
[{'time': 1552331373.1976223, 'data': OrderedDict([('positioners_p1_readback_value', 6.0), ('positioners_p1_setpoint_value', 6.0), ('detectors_d01_current_value', 2.0), ('detectors_d02_current_value', 1.0), ('detectors_d03_current_value', 0.056257233023643494)]), 'timestamps': OrderedDict([('positioners_p1_readback_value', 1552330869.960696), ('positioners_p1_setpoint_value', 1552330869.960696), ('detectors_d01_current_value', 1552330869.960696), ('detectors_d02_current_value', 1552330869.960696), ('detectors_d03_current_value', 1552330869.960696)])}]


In [21]:
RE(bp.fly([ifly]))

Transient Scan ID: 1     Time: 2019/03/11 14:09:33
Persistent Unique Scan ID: 'fe82184a-2e3c-4165-b002-936d2ae025ed'
collecting: 9.659096717834473
collecting: 12.168171644210815
collecting: 14.673133850097656
collecting: 17.17692804336548
collecting: 19.684027671813965
New stream: 'primary'
+-----------+------------+
|   seq_num |       time |
+-----------+------------+
collect() started
+-----------+------------+
generator fly ['fe82184a'] (scan num: 1)





('fe82184a-2e3c-4165-b002-936d2ae025ed',)

In [22]:
h = db[-1]
print(h.stream_names)

['primary']


In [23]:
h.table("primary")

Unnamed: 0_level_0,time,positioners_p1_readback_value,positioners_p1_setpoint_value,detectors_d01_current_value,detectors_d02_current_value,detectors_d03_current_value
seq_num,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,2019-03-11 14:09:42.902434349,-3.0,-1.2,1.0,1.0,0.038157
2,2019-03-11 14:09:45.411509275,-1.2,0.6,2.0,2.0,0.085698
3,2019-03-11 14:09:47.916471481,0.6,2.4,0.0,2.0,0.329418
4,2019-03-11 14:09:50.420265675,2.4,4.2,4.0,3.0,0.859881
5,2019-03-11 14:09:52.927365303,4.2,6.0,0.0,2.0,0.164117


In [24]:
list(h.documents())

[('start',
  {'time': 1552331373.2231252,
   'uid': 'fe82184a-2e3c-4165-b002-936d2ae025ed',
   'scan_id': 1,
   'plan_type': 'generator',
   'plan_name': 'fly'}),
 ('descriptor',
  {'run_start': 'fe82184a-2e3c-4165-b002-936d2ae025ed',
   'data_keys': {'positioners_p1_readback_value': {'source': 'scans_scan1_positioners_p1_readback_value',
     'dtype': 'number',
     'shape': [1]},
    'positioners_p1_setpoint_value': {'source': 'scans_scan1_positioners_p1_setpoint_value',
     'dtype': 'number',
     'shape': [1]},
    'detectors_d01_current_value': {'source': 'scans_scan1_detectors_d01_current_value',
     'dtype': 'number',
     'shape': [1]},
    'detectors_d02_current_value': {'source': 'scans_scan1_detectors_d02_current_value',
     'dtype': 'number',
     'shape': [1]},
    'detectors_d03_current_value': {'source': 'scans_scan1_detectors_d03_current_value',
     'dtype': 'number',
     'shape': [1]}},
   'time': 1552331395.436565,
   'uid': 'eadf9b26-88d6-4112-9086-a93605565b23'

# Conclusion

The ophyd *Flyer* is not well-suited to the case of the simple 1-D step scan using the scan record.  With *Flyer* objects, the workflow is *kickoff*, *complete*, then *collect*, where `complete()` will return a status object and `collect()` is a generator of data events.  Sicne `collect()` is called after the sscan is done, there is no chance to collect the data (with timestamps) after each step in the scan.  Instead, the Flyer is best for *silent* accumulation and cache of that data during the `complete()` method, then reporting of it later in `collect()`.  **We could do better supporting this use of sscan with a BS plan.**

Good to know.