# sscan as 1D Flyer

<font color="red">NOTE</font>: This notebook is under construction!

## 1D step scans using sscan record

Use the [sscan record](https://epics.anl.gov/bcda/synApps/sscan/sscanRecord.html) as an [ophyd Flyer](http://nsls-ii.github.io/bluesky/async.html) for [bluesky](http://nsls-ii.github.io/bluesky) data acquisition.  Consider the case of [1D step scans using sscan record](https://epics.anl.gov/bcda/synApps/sscan/sscanRecord.html#HEADING_1-1).

Suggest the _noisy v m1_ scan, done as 1-D step scan with sscan record.  noisy is the swait record calculating a peak based on m1 position.

In [1]:
import os, pathlib, sys
sys.path.append(os.path.abspath(os.path.join(pathlib.Path.home(), "bluesky")))
from instrument.collection import *

I Fri-10:45:27 - ############################################################ startup
I Fri-10:45:27 - logging started
I Fri-10:45:27 - logging level = 10
I Fri-10:45:27 - /home/prjemian/bluesky/instrument/collection.py
I Fri-10:45:27 - /home/prjemian/bluesky/instrument/mpl/notebook.py


Activating auto-logging. Current session state plus future input saved.
Filename       : /home/prjemian/Documents/projects/BCDA-APS/bluesky_training/.logs/ipython_console.log
Mode           : rotate
Output logging : True
Raw input log  : False
Timestamping   : True
State          : active


I Fri-10:45:27 - #### Bluesky Framework ####
I Fri-10:45:27 - /home/prjemian/bluesky/instrument/framework/check_python.py
I Fri-10:45:27 - /home/prjemian/bluesky/instrument/framework/check_bluesky.py
I Fri-10:45:28 - /home/prjemian/bluesky/instrument/framework/initialize.py
I Fri-10:45:28 - using databroker catalog 'training'
I Fri-10:45:28 - /home/prjemian/bluesky/instrument/framework/metadata.py
I Fri-10:45:29 - #### Devices ####
I Fri-10:45:29 - /home/prjemian/bluesky/instrument/devices/area_detector.py
I Fri-10:45:29 - /home/prjemian/bluesky/instrument/devices/calculation_records.py
I Fri-10:45:31 - /home/prjemian/bluesky/instrument/devices/ioc_stats.py
I Fri-10:45:31 - /home/prjemian/bluesky/instrument/devices/motors.py
I Fri-10:45:31 - /home/prjemian/bluesky/instrument/devices/noisy_detector.py
I Fri-10:45:31 - /home/prjemian/bluesky/instrument/devices/scaler.py
I Fri-10:45:32 - /home/prjemian/bluesky/instrument/devices/shutter_simulator.py
I Fri-10:45:32 - /home/prjemian/bluesky

In [2]:
# add this notebook's name to the metadata

RE.md["notebook"] = "sscan_1d_flyer"

In [3]:
# define the IOC's prefix

ioc = "gp:"

## Bluesky step scan

We are told that the `noisy` signal will show a peak when `m1` is moved over the range `[-1 .. +1]`.  Use *bluesky* to show that peak.

In [None]:
RE(bp.scan([noisy], m1, -1.1, 1.1, 21))

In [None]:
# from apstools.synApps import SaveData

# save_data = SaveData(f"{ioc}saveData_", name="save_data")

## sscan as Bluesky Flyer

In [59]:
from apstools.synApps import SscanRecord

class MySscanRecord(SscanRecord):

    def stage(self):
        super().stage()
        self.select_channels()

    def unstage(self):
        super().unstage()
        self.select_channels()

    def setup_staging_1D_step(self, start=-1.1, finish=1.1, npts=21, ddelay=0.01, pdelay=0):
        # configure sscan record for 1D step scan: noisy v. m1
        self.xref = dict(
            positioners=[m1,],
            raw_detectors=[noisy, ],
            detectors=[noisy, m1.user_setpoint]  # include motor setpoints array
        )
        self.stage_sigs["number_points"] = npts
        self.stage_sigs["pasm"] = "PRIOR POS"
        self.stage_sigs["positioner_delay"] = pdelay
        for i, p in enumerate(self.xref["positioners"]):
            self.stage_sigs[f"positioners.p{i+1}.setpoint_pv"] = p.user_setpoint.pvname
            self.stage_sigs[f"positioners.p{i+1}.readback_pv"] = p.user_readback.pvname
            self.stage_sigs[f"positioners.p{i+1}.start"] = start
            self.stage_sigs[f"positioners.p{i+1}.end"] = finish
        self.stage_sigs["detector_delay"] = ddelay
        for i, d in enumerate(self.xref["detectors"]):
            self.stage_sigs[f"detectors.d{i+1:02d}.input_pv"] = d.pvname

        # get timestamp of each point in the scan
        # This is a sscan record trick that returns the time (s) since the scan started.
        self.stage_sigs[f"positioners.p4.readback_pv"] = "time"

        # self.hints = {'fields': ['scan_positioners_p1_readback_value', 'scan_detectors_d01_current_value']}

scan = MySscanRecord(f"{ioc}scan1", name="scan")

# configure the sscan record for data collection:
scan.wait_for_connection()  # sscan records have _many_ channels and fields
scan.reset()  # clear out any previous configuration

scan.setup_staging_1D_step()
scan.select_channels()

In [60]:
# TODO: if save data is to be used

    # # configure saveData for data collection into MDA files:
    # save_data.stage_sigs["file_system"] = "/tmp"
    # save_data.stage_sigs["subdirectory"] = "saveData"
    # save_data.stage_sigs["base_name"] = "sscan1_"
    # save_data.stage_sigs["next_scan_number"] = 1
    # save_data.stage_sigs["comment1"] = "testing"
    # save_data.stage_sigs["comment2"] = "configured and run from ophyd"

In [61]:
# diagnostics
# -----------
scan.stage()
# scan.unstage()
# RE(bp.count([scan]))
# scan.read()
# scan.stage??
# bp.fly??
# scan.positioners.p1.array.read()
# scan.detectors.d01.array.read()
# scan.current_point.read()

In [17]:
n = scan.current_point.get()
obj = scan.detectors.d01.array
data = obj.read()[obj.name]
obj.name

'scan_detectors_d01_array'

In [84]:
# TODO: develop method to get scan data from sscan and render dict for flyer.collect()

n = scan.current_point.get()
ts_last_point = scan.current_point.read()[scan.current_point.name]["timestamp"]

def get_sscan_data(arr, n):
    data = obj.read()[obj.name]
    data["value"] = list(data["value"][:n])
    return data

# get the per-step time stamps from positioner 4
time_stamps = scan.positioners.p4.array.get(use_monitor=False)[:n]
time_stamps = ts_last_point + time_stamps - time_stamps.max()
print(f"{time_stamps = }")

results = dict(data={}, timestamps={}, time=None, seq_num=1)
# This gets the full array for each item in one document
# TODO: get the arrays and yield one row at a time with each timestamp
for category, signals in scan.xref.items():
    print(f"{category = }")
    for i, signal in enumerate(signals):
        if category == "positioners":
            item = f"p{i+1}"
        elif category == "detectors":
            item = f"d{i+1:02d}"
        else:
            continue
        obj = getattr(scan, f"{category}.{item}.array")
        data = get_sscan_data(obj, n)
        print(data)
        print("-"*10)
        results["data"][signal.name] = data["value"]
        results["timestamps"][signal.name] = data["timestamp"]
        if results["time"] is None:
            results["time"] = data["timestamp"]
        # print(f"{i+1} {signal.name} {obj.name}")

print(results)

time_stamps = array([1.63069337e+09, 1.63069337e+09, 1.63069337e+09, 1.63069337e+09,
       1.63069337e+09, 1.63069337e+09, 1.63069337e+09, 1.63069337e+09,
       1.63069337e+09, 1.63069338e+09, 1.63069338e+09, 1.63069338e+09,
       1.63069338e+09, 1.63069338e+09, 1.63069338e+09, 1.63069338e+09,
       1.63069338e+09, 1.63069338e+09, 1.63069338e+09, 1.63069338e+09,
       1.63069338e+09])
category = 'positioners'
{'value': [-1.1, -0.99, -0.88, -0.77, -0.66, -0.55, -0.44, -0.33, -0.22, -0.11, 0.0, 0.11, 0.22, 0.33, 0.44, 0.55, 0.66, 0.77, 0.88, 0.99, 1.1], 'timestamp': 1630683862.61889}
----------
category = 'raw_detectors'
category = 'detectors'
{'value': [12.990758, 14.700112, 17.19369, 19.874727, 23.983894, 28.49756, 35.105396, 43.312847, 58.13608, 77.162796, 106.2024, 166.10352, 277.25006, 573.58344, 1748.3751, 23488.064, 6030.8057, 1047.845, 424.0055, 222.18307, 140.48398], 'timestamp': 1630693379.588263}
----------
{'value': [-1.1, -0.99, -0.88, -0.77, -0.66, -0.55, -0.44, -0.33,

In [18]:
data["value"] = data["value"][:n]
data

{'value': array([1.2956302e+01, 1.4792508e+01, 1.6984699e+01, 2.0519512e+01,
        2.3728724e+01, 2.8188606e+01, 3.5111115e+01, 4.3683010e+01,
        5.7565067e+01, 7.5888741e+01, 1.0948700e+02, 1.6188750e+02,
        2.7265936e+02, 5.6619434e+02, 1.7620000e+03, 2.3679143e+04,
        5.9099243e+03, 1.0533992e+03, 4.2077960e+02, 2.2589296e+02,
        1.3686008e+02], dtype=float32),
 'timestamp': 1630684587.411814}

In [None]:
xref = dict(noisy)

In [None]:
from ophyd import DeviceStatus, Signal
from ophyd.flyers import FlyerInterface

In [None]:
class SscanFlyer_1D_StepSimple(FlyerInterface):
    
    sscan = None
    name = None
    
    def __init__(self, sscan, *args, name=None, **kwargs):
        self.sscan = sscan
        self._acquiring = False

        if name is None:
            raise ValueError("MUST provide `name` keyword argument.")
        self.name = name

        super().__init__(*args, **kwargs)

    def kickoff(self):
        # scan.reset()

        scan.setup_staging_1D_step()

        # set(), do not `yield`, in kickoff()
        self.sscan.execute_scan.set(1)  # start the sscan record
        self._acquiring = True

        status = DeviceStatus(self)
        status.set_finished()  # means that kickoff was successful
        return status

    def complete(self):
        """Wait for sscan to complete."""
        if not self._acquiring:
            raise RuntimeError("Not acquiring")

        st = DeviceStatus(self)
        cb_started = False

        def execute_scan_cb(value, timestamp, **kwargs):
            value = int(value)
            if cb_started and value == 0:
                scan.unstage()
                self._acquiring = False
                self.sscan.execute_scan.unsubscribe(execute_scan_cb)
                if not st.done:
                    logging.info("Setting %s execute status to `done`.", self.sscan.name)
                    st.set_finished()

        self.sscan.execute_scan.subscribe(execute_scan_cb)
        # self.sscan.execute_scan.set(1)
        cb_started = True
        return st

    def describe_collect(self):
        # http://nsls-ii.github.io/ophyd/generated/ophyd.flyers.FlyerInterface.describe_collect.html
        # Provide schema & meta-data from collect()
        dd = {}
        dd.update(m1.describe())
        dd.update(noisy.describe())
        return {self.name: dd}
        # return {
        #     "primary": {
        #         m1.name: {
        #             "dims": [],
        #             "dtype": "number",
        #             "shape": [],
        #             "source": "",
        #         },
        #         noisy.name: {
        #             "dims": [],
        #             "dtype": "number",
        #             "shape": [],
        #             "source": "",
        #         },
        #         # "time": {
        #         #     "dims": [],
        #         #     "dtype": "number",
        #         #     "shape": [],
        #         #     "source": "",
        #         # },
        #     },
        # }

    def collect(self):
        """Retrieve all collected data (after complete())."""
        if self._acquiring:
            raise RuntimeError("Acquisition still in progress. Call complete() first.")
        # http://nsls-ii.github.io/ophyd/generated/ophyd.flyers.FlyerInterface.collect.html
        # Retrieve data from the flyer as proto-events

        ts = time.time()
        p1 = [0, 1]
        d1 = [1, 0]
        yield {
            "data": {m1.name: p1, noisy.name: d1, },
            'timestamps': {m1.name: ts, noisy.name: ts, },
            'time': ts
        }

        # ts = time.time()
        # n = self.sscan.current_point.get()
        # p1 = scan.positioners.p1.array.get(use_monitor=False, as_numpy=True)[:n]
        # d1 = scan.detectors.d01.array.get(use_monitor=False, as_numpy=True)[:n]

        # yield {
        #     'data': {
        #         m1.name: p1,
        #         noisy.name: d1,
        #     },
        #     'timestamps': {
        #         m1.name: ts,
        #         noisy.name: ts,
        #     },
        #     'time': ts
        # }

In [None]:
scan_flyer = SscanFlyer_1D_StepSimple(scan, name="scan_flyer")

In [None]:
RE(bp.fly([scan_flyer]))

## MockFlyer with motor and pseudo-detector

ophyd's *MockFlyer* example : https://github.com/NSLS-II/ophyd/blob/master/ophyd/sim.py#L546

In [None]:
import asyncio
from collections import deque
from collections import OrderedDict
from ophyd import DeviceStatus
import time

class MockFlyer:
    """
    Class for mocking a flyscan API implemented with stepper motors.
    """

    def __init__(self, name, detector, motor, start, stop, num, loop=None):
        self.name = name
        self.parent = None
        self._mot = motor
        self._detector = detector
        self._steps = np.linspace(start, stop, num)
        self._data = deque()
        self._completion_status = None
        if loop is None:
            loop = asyncio.get_event_loop()
        self.loop = loop

    def __setstate__(self, val):
        name, detector, motor, steps = val
        self.name = name
        self.parent = None
        self._mot = motor
        self._detector = detector
        self._steps = steps
        self._completion_status = None
        self.loop = asyncio.get_event_loop()

    def __getstate__(self):
        return (self.name, self._detector, self._mot, self._steps)

    def read_configuration(self):
        return OrderedDict()

    def describe_configuration(self):
        return OrderedDict()

    def describe_collect(self):
        dd = dict()
        dd.update(self._mot.describe())
        dd.update(self._detector.describe())
        return {'stream_name': dd}

    def complete(self):
        if self._completion_status is None:
            raise RuntimeError("No collection in progress")
        return self._completion_status

    def kickoff(self):
        if self._completion_status is not None:
            raise RuntimeError("Already kicked off.")
        self._data = deque()

        self._future = self.loop.run_in_executor(None, self._scan)
        st = DeviceStatus(device=self)
        self._completion_status = st
        self._future.add_done_callback(lambda x: st._finished())
        return st

    def collect(self):
        if self._completion_status is None or not self._completion_status.done:
            raise RuntimeError("No reading until done!")
        self._completion_status = None

        yield from self._data

    def _scan(self):
        "This will be run on a separate thread, started in self.kickoff()"
        time.sleep(.1)
        for p in self._steps:
            stat = self._mot.set(p)
            while True:
                if stat.done:
                    break
                time.sleep(0.01)
            stat = self._detector.trigger()
            while True:
                if stat.done:
                    break
                time.sleep(0.01)

            event = dict()
            event['time'] = time.time()
            event['data'] = dict()
            event['timestamps'] = dict()
            for r in [self._mot, self._detector]:
                d = r.read()
                for k, v in d.items():
                    event['data'][k] = v['value']
                    event['timestamps'][k] = v['timestamp']
            self._data.append(event)

    def stop(self, *, success=False):
        pass


In [None]:
mflyer = MockFlyer('mflyer', noisy, m1, -1.1, 1.2, 29)

In [None]:
RE(bp.scan([noisy], m1, -1.1, 1.1, 11))

In [None]:
run = cat[-1]
run.primary.read()