# Combining LabView BCS with Bluesky

## Overview

This notebook demonstrates how the LabView Beamline Control System API can be used to implement a "Single Motor Scan" as an `ophyd` device. Furthermore, this "Single Motor Scan" device can be initiated by the `bluesky` Run Engine as a `fly` plan with data avaliable in a bluesky run document.

### Brief Summary

The bluesky `fly` plan will cede control to an `ophyd` "[Fly-able Interface](http://nsls-ii.github.io/ophyd/architecture.html#fly-able-interface)" (e.g., [FlyerInterface](https://github.com/bluesky/ophyd/blob/dd4b3e389a0202ecacce39fc3965d703c616b0d4/ophyd/flyers.py#L17)) to collect data asynchronously, and then return data as `[event](https://blueskyproject.io/event-model/data-model.html#event-document)` documents.

The "Fly-able Interface" must:

1. `kickoff()` the scan
2. `complete()` data collection for the scan
3. `describe_collect()`: provide a descriptor for the data that is being collected
4. `collect()`: yield event document(s) that contain the data

More details about this concept of embedding external control system scans into bluesky can be found in [this example notebook](https://github.com/BCDA-APS/bluesky_training/blob/31-sscan-1D-as-flyer/sscan_1d_flyer.ipynb).

This example uses a _synchronous/blocking_ version of the BCS-API library (BCSz_sync.py), which is otherwise identical to the asynchronous version (BCSz.py). The blocking version was created because the bluesky Run Engine does not currently use `asyncio` `await` directive for the Fly-able interface methods.

## Credits

This example builds heavily upon two recently published works, as well as the many developers and contributors to the bluesky and BCS projects.

* [BCS-API](http://bcsapi.als.lbl.gov:3080): ZeroMQ library and python bindings (BCSz.py) developed by [Damon English](https://github.com/daenglis) for interfacing with the Advanced Light Source's BCS control system.
* [`sscan as 1D Flyer`](https://github.com/BCDA-APS/bluesky_training/blob/31-sscan-1D-as-flyer/sscan_1d_flyer.ipynb) example published by [Pete Jemian](https://github.com/prjemian) to demonstrate how the bluesky fly plan can be used to initiate scans and report data from external control systems (in that case, the EPICS sscan record).

# Prepare the notebook settings

In [1]:
from IPython.core.interactiveshell import InteractiveShell

# pretty print all cell's output and not just the last one
InteractiveShell.ast_node_interactivity = "all"

In [2]:
import matplotlib
%matplotlib notebook

In [3]:
%%javascript
var kernel = IPython.notebook.kernel;
var thename = window.document.getElementById("notebook_name").innerHTML;
var command = "theNotebook = " + "'"+thename+"'";
kernel.execute(command);

<IPython.core.display.Javascript object>

In [4]:
try:
    theNotebook
except NameError as e:
    theNotebook = "BCS-API_05_BlueskyFlying_M201Roll_BcszSync"
    theNotebook

'BCS-API_05_BlueskyFlying_M201Roll_BcszSync_nb'

In [5]:
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# Setup bluesky

In [6]:
from bluesky import RunEngine

RE = RunEngine({})

In [7]:
from bluesky.callbacks.best_effort import BestEffortCallback
bec = BestEffortCallback()

# Send all metadata/data captured to the BestEffortCallback.
bec_token = RE.subscribe(bec)
bec.enable_table()  # Print hinted readings from the ‘primary’ stream in a LiveTable

# Make plots update live while scans run.
from bluesky.utils import install_kicker
install_kicker()

In [8]:
from dotenv import load_dotenv

load_dotenv()  # import environment variables from .env

True

In [9]:
from databroker import Broker
db = Broker.named('TEST')

# Insert all metadata/data captured into db.
RE.subscribe(db.insert)

1

In [10]:
from bluesky.utils import ProgressBarManager
RE.waiting_hook = ProgressBarManager()

In [11]:
RE.md['notebook'] = theNotebook
RE.md

{'versions': {'ophyd': '1.6.1', 'bluesky': '1.6.7'},
 'notebook': 'BCS-API_05_BlueskyFlying_M201Roll_BcszSync_nb'}

# Setup BCS-API

In [12]:
# These first two are necessary to use the API
import asyncio
import BCSz_sync

# The rest of the imports are for the Example program
import time
import random

# for working with images
from PIL import Image
import io
from IPython.display import display # to display images

# for plotting and working with files
import pandas as pd
import matplotlib.pyplot as plt
import os

In [13]:
from numpy import nan

## Helper classes and functions for reading data files

In [14]:
def find_bcs_data(data):
    """
    Keep trying to import the data into pandas until success
    The older BCS scans (so called 'Beamline Scans') have a variably sized header.
    """
    for skrows in range(30):
        data.seek(0)
        try:
            df = pd.read_csv(data, sep='\t', skiprows=skrows, nrows=10)
            df['Time (s)']
            break
        except (pd.errors.ParserError, KeyError):
            pass # try again with skrows +=1

    data.seek(0)
    return skrows


def plot_bcs_file(file_text='', plot_title='', x='Time (s)', y=None):
    """Display example plot for API testing"""
    data = io.StringIO(file_text)
    skip_rows = find_bcs_data(data)
    df = pd.read_csv(data, sep='\t', skiprows=skip_rows)
    print(df.columns)
    df.drop(0, inplace=True)
    print(df.head())

    if not y:
        y = random.choice(df.columns)

    df.plot(kind='scatter', x=x, y=y)
    plt.title(plot_title)
    plt.show()
    return df

# Connect to BCS server

In [15]:
bl402bcs = BCSz_sync.BCSServer()

bl402bcs.connect(addr=str(os.environ["BCS_SERVER"]), port=int(os.environ["BCS_PORT"]))

Server Public Key b'Of([HlwK7Q(<)Mni$[J7{%KwFDAEs$RG}7tbIOb&'


---
# PAUSE to check that beamline computer is available for testing

No, seriously!  Ensure that no one else is using the beamline.

---

# Scan motors, read data
## Careful!!!  This is moving real motors on the beamline

# Define Ophyd devices for BCS + Bluesky

In [16]:
from ophyd import Device, Component, Signal
from ophyd.status import DeviceStatus, SubscriptionStatus
from ophyd.flyers import FlyerInterface

## Ophyd device for BCS Single Motor Scan

In [17]:
class BcsSingleMotorScan(Device):
    """
    BCS Single Motor Scan (Move-Delay-Acquire) as an Ophyd device
    """
    
    # Signals to enable subscriptions for scan setup
    motor_name = Component(Signal, kind="config", value='')
    first_value = Component(Signal, kind="config", value=nan)
    last_value = Component(Signal, kind="config", value=nan)
    step_value = Component(Signal, kind="config", value=nan)
    delay_sec = Component(Signal, kind="config", value=0.)
    count_sec = Component(Signal, kind="config", value=1.)
    num_scans = Component(Signal, kind="config", value=1)
    bidirectional = Component(Signal, kind="config", value=False)
    final_move = Component(Signal, kind="config", value="Stay")
    memo = Component(Signal, kind="config", value='')
    filename_pattern = Component(Signal,kind="config", value="*.txt")
    
    # Signals to enable subscriptions/polling for scan status
    ready = Component(Signal, kind="omitted", value=False)
    busy = Component(Signal, kind="omitted", value=False)
    done = Component(Signal, kind="omitted", value=True)
    execute_scan = Component(Signal, kind="omitted", value=True)
    data_path = Component(Signal, kind="normal", value='')


    def scan_setup(
            self, *, 
            # server: str, port: int, 
            bcs_server: BCSz_sync.BCSServer, 
            motor: str, start: float, stop: float, step: float, 
            delay: float = 0., count: float = 1., 
            num_scans: int = 1, bidirect: bool = False, 
            final: str = "Stay",
            memo: str = '',
            filename_pattern: str = "*.txt", 
            **kwargs):
        """Configure a BCS Single Motor Scan"""
        
        # TODO: Check that bcs_server is connected
        self._bcs = bcs_server
        
        self.motor_name.put(motor)
        self.first_value.put(start)
        self.last_value.put(stop)
        self.step_value.put(step)
        self.delay_sec.put(delay)
        self.count_sec.put(count)
        self.num_scans.put(num_scans)
        self.bidirectional.put(bidirect)
        self.final_move.put(final)
        self.memo.put(memo)
        self.filename_pattern.put(filename_pattern)
        
        # Scan is configured and ready to execute
        self.ready.put(True)


    def set(self, value, **kwargs):
        """interface to use bps.mv()"""
        if value != 1:
            return
        if self.ready.get():
            bcs_st = self._bcs.scan_status()
            self.busy.put(bcs_st["running_scan"])
        if (not self.ready.get()) or (self.busy.get()):
            # TODO: Raise Warning that scan is not ready
            return

        async def check_for_acquire_done(self):
            bcs_st = self._bcs.scan_status()
            while bcs_st["running_scan"]:
                await asyncio.sleep(0.1)
                bcs_st = self._bcs.scan_status()
            else:
                # bcs_st = self._bcs.scan_status()
                print("Scan is finshed!")
                # print(f"{bcs_st =}...")
                self.data_path.put(os.path.join(bcs_st["Log Directory"], bcs_st["Last Filename"]))
                # print(f"{self.data_path.get() =}")
                self.done.put(True)
                self.busy.put(False)
                
        def check_value(*, old_value, value, **kwargs):
            "Return True when the acquisition is complete, False otherwise."
            return (value and not old_value)
        
        status = SubscriptionStatus(self.done, check_value)
        
        self.execute_scan.put(True)
        self._bcs.sc_single_motor_scan(
            x_motor=self.motor_name.get(), 
            start=self.first_value.get(), 
            stop=self.last_value.get(), 
            increment=self.step_value.get(), 
            delay_after_move_s=self.delay_sec.get(), 
            count_time_s=self.count_sec.get(), 
            number_of_scans=self.num_scans.get(),
            bidirect=self.bidirectional.get(),
            at_end_of_scan=self.final_move.get(), 
            description=self.memo.get(),
            file_pattern=self.filename_pattern.get(),
            )
        self.busy.put(True)
        self.done.put(False)
        # Give the scanner time to start
        time.sleep(1)  # TODO: Is this needed?
        self.execute_scan.put(False)
        acquire_task = asyncio.create_task(check_for_acquire_done(self))
        # acquire_task.add_done_callback()
        
        return status

In [18]:
ssM201Roll = BcsSingleMotorScan("BCS402:", name="ssM201Roll")

In [19]:
ssM201Roll.scan_setup(
        bcs_server=bl402bcs, 
        motor="M201 Roll", start=0., stop=-0.14, step=-0.02, 
        delay=0., count=0.1,
    )

In [20]:
ssM201Roll.get()

BcsSingleMotorScanTuple(motor_name='M201 Roll', first_value=0.0, last_value=-0.14, step_value=-0.02, delay_sec=0.0, count_sec=0.1, num_scans=1, bidirectional=False, final_move='Stay', memo='', filename_pattern='*.txt', ready=True, busy=False, done=True, execute_scan=True, data_path='')

### Usying sycnchronous BCSs library

In [21]:
ssM201Roll.set(1)

SubscriptionStatus(device=ssM201Roll_done, done=False, success=False)

In [22]:
# Wait for scan to finish
while ssM201Roll.busy.get():
    await asyncio.sleep(0.5)
    print("...", end=' ')

... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... Scan is finshed!
... 

In [23]:
ssM201Roll.busy.get()

False

In [24]:
ssM201Roll.done.get()

True

In [25]:
ssM201Roll.data_path.get()

'C:\\Beamline Controls\\BCS Setup Data\\210921\\Single Motor Scan 000050.txt'

## Helper functions for converting BCS data to Bluesky event model data

In [26]:
from bcs_events import *

In [27]:
from datetime import datetime
import event_model

## Ophyd flyer to run BCS Single Motor Scan as a Bluesky "flying" scan

In [28]:
class BcsSigScanFlyer(FlyerInterface, BcsSingleMotorScan):
    '''Example of BCS Single Motor Scan accessed through Bluesky'''
    
    yield_array_events = Component(Signal, kind="config", value=False)

    def __init__(self, *args, **kwargs):
        self._acquiring = False
        self._paused = False

        super().__init__(*args, **kwargs)
        
    def stage(self):
        super().stage()
        # self.select_channels()

    def unstage(self):
        super().unstage()
        # self.select_channels()

    def read_configuration(self):
        return {}

    def describe_configuration(self):
        return {}

    def kickoff(self):
        """Start the scan."""
        self.stage()
        time.sleep(0.1)

        # Check for currently running scan
        bcs_st = self._bcs.scan_status()
        if bcs_st["running_scan"]:
            raise RuntimeError("Cannot start scan. Another scan is already running.")
        
        self.set(1)
        self._acquiring = True
        self._paused = False

        status = DeviceStatus(self)
        status.set_finished()  # means that kickoff was successful
        return status

    def complete(self):
        """Wait for sscan to complete."""
        logging.info("complete() starting")
        if not self._acquiring:
            raise RuntimeError("Not acquiring")

        # status = DeviceStatus(self)
        # cb_started = False

        def is_scan_complete(*, old_value, value, **kwargs):
            "Return True when the acquisition is complete, False otherwise."
            # value = bool(value)
            if self._acquiring and value and not old_value:
                logging.info("complete() ending")
                self.unstage()
                self._acquiring = False
                return True
            return False

        status = SubscriptionStatus(self.done, is_scan_complete)
        
        return status

    def describe_collect(self):
        """
        Provide schema & meta-data from collect().
        
        http://nsls-ii.github.io/ophyd/generated/ophyd.flyers.FlyerInterface.describe_collect.html
        """
        # TODO: Add hinted signals
        
        ai_names = self._bcs.list_ais()['names']
        
        scan_motor = self.motor_name.get()
        scan_channels = [
            "Time of Day", 
            "Time (s)", 
            f"{scan_motor} Goal", 
            f"{scan_motor} Actual", 
            ] + ai_names
      
        # DataFrame enables convenience functions from 'bcs_events'
        data_df = pd.DataFrame(columns=scan_channels)

        sanitize_event_data_keys = {col: sanitize_key(col) 
            for col in data_df.columns[1:].values}

        descriptor_keys = get_descriptor_keys(
            data_df, 
            sanitize_event_data_keys,
            data_src="Inferred from AI List")
        
        event_stream_name = "primary"
        
        return {event_stream_name: descriptor_keys}

    def collect(self):
        """
        Retrieve all collected data (after complete()).
        
        Retrieve data from the flyer as proto-events.
        http://nsls-ii.github.io/ophyd/generated/ophyd.flyers.FlyerInterface.collect.html
        """
        if self._acquiring:
            raise RuntimeError("Acquisition still in progress. Call complete() first.")
        
        def get_data_from_scan(self):
            """Extract data from scan output file; return as PANDAS DataFrame."""
            file_path = self.data_path.get()
            if not file_path:
                raise RuntimeError("There is no scan data. Call kickoff() first.")
            
            file_text = self._bcs.get_text_file(file_path)['text']

            # Get date from BCS data file header
            with io.StringIO(file_text) as data_file:
                data_file_date_str = data_file.readline().strip().split("Date: ", 1)[1]
                data_date = datetime.strptime(data_file_date_str, "%m/%d/%Y").date()
                self._data_date = data_date
                
            data = io.StringIO(file_text)
            skip_rows = find_bcs_data(data)
            df = pd.read_csv(data, sep='\t', skiprows=skip_rows)
            return df

        def generate_scan_data_events(self):
            """Get the entire scan data and yield bluesky events."""
            data_df = get_data_from_scan(self)
            num_points = len(data_df)
            
            if self.yield_array_events.get():
                raise NotImplementedError("Array events not currently supported")
                # TODO: Implement array event generation

            # Run info will not be used by fly() plan
            # ...enables convenience functions from 'bcs_events'
            run_bundle = event_model.compose_run()
            event_stream_name = 'primary'

            sanitize_event_data_keys = {col: sanitize_key(col) 
                for col in data_df.columns[1:].values}

            descriptor_keys = get_descriptor_keys(
                data_df, 
                sanitize_event_data_keys,
                data_src=self.data_path.get())
    
            stream_descriptor = run_bundle.compose_descriptor(
                data_keys=descriptor_keys,
                name=event_stream_name,
                )

            # Get date from BCS data file header
#             with io.StringIO(self.data_path.get()) as data_file:
#                 data_file_date_str = data_file.readline().strip().split("Date: ", 1)[1]
#                 data_date = datetime.strptime(data_file_date_str, "%m/%d/%Y").date()
            data_date = self._data_date

            # This is only for array events
#             data_df = add_timestamps(data_df, data_date, inplace=True)
#             data_df.drop('Time of Day', axis=1, inplace=True)  # Redundant; have timestamps
#             timestamp_col="timestamp"
    
            data_df = get_timestamps(data_df, data_date, inplace=True)

            def get_bundled_event(data_row):
                return get_event(
                    data_row, sanitize_event_data_keys, stream_descriptor)

            # Pack events into an event_page
            events = data_df.apply(get_bundled_event, axis='columns').values

            # yield 'event_page', event_model.pack_event_page(*events)
            
            for event in events:
                # yield 'event', event
                yield dict(
                    seq_num=event["seq_num"],
                    time=event["time"],
                    data=event["data"],
                    timestamps=event["timestamps"],
                )

        yield from generate_scan_data_events(self)
        self.unstage()

In [29]:
fly_M201Roll = BcsSigScanFlyer("BCS402:", name="ssflyM201Roll")
fly_M201Roll.scan_setup(
        bcs_server=bl402bcs, 
        motor="M201 Roll", start=0.3, stop=-0.12, step=-0.02, 
        delay=0., count=0.1,
    )

In [30]:
fly_M201Roll.data_path.put('C:\\Beamline Controls\\BCS Setup Data\\210913\\Single Motor Scan 000033.txt')

In [31]:
fly_M201Roll.describe_collect()

{'primary': {'Time (s)': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'sec'},
  'M201 Roll Goal': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'mm'},
  'M201 Roll Actual': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'mm'},
  'I0 BL': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'counts / sec'},
  'I0 ES': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'counts / sec'},
  'EY': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'counts / sec'},
  'LY': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'counts / sec'},
  'FY': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'units': 'counts / sec'},
  'EY SCVM': {'dtype': 'number',
   'source': 'Inferred from AI List',
   'shape': [],
   'u

In [32]:
# for doc_type, doc in fly_M201Roll.collect():
#     print(doc_type)
#     print(doc)

for event_reading in fly_M201Roll.collect():
    print(event_reading)

{'seq_num': 1, 'time': 1632234869.231537, 'data': {'Time (s)': 4.161, 'M201 Roll Goal': 0.0, 'M201 Roll Actual': -0.001, 'I0 BL': 570.0, 'I0 ES': 0.0, 'EY': 1430.0, 'LY': 0.0, 'FY': 0.0, 'EY SCVM': 0.0, 'LY SCVM': 0.0, 'Clock': 100000.0, 'Original Clock': 0.0, 'Captured Mono Energy': 699.98718231, 'Energy': 699.84657908, 'Polarization': 0.0, 'Beam Current': -0.128479, 'Temp A': 0.0, 'Temp B': 0.0, 'Temp HT': 0.0, 'X': -0.5, 'Y': 9.0, 'Z': 0.01492063, 'Theta': -90.002, 'M206 Pitch': 5.76, 'I0 BL Amp (nA)': 1.0, 'I0 ES Amp (nA)': 2.0, 'EY Amp (nA)': 0.2, 'LY Amp (nA)': 0.5, 'FY Amp (nA)': -1000.0, 'EY SCVM Amp (nA)': 5.0, 'LY SCVM Amp (nA)': 5.0, 'EPU Energy': 1984.032, 'EPU Gap': 100.99997187, 'EPU Phase': 0.01600859, 'EPU A': 8.59e-06, 'EPU B': -1.743e-05, 'EPU Harmonic': 3.0, 'Mono Energy': 699.84657908, 'Grating Posn': 29206.25, 'Grating Goal': 29204.60637376, 'Grating Error': 184.5, 'Premirror Posn': 26750.72, 'Premirror Goal': 26750.25360588, 'Premirror Error': -54.5, 'Premirror St

### Run the Bluesky flying plan

In [33]:
fly_M201Roll = BcsSigScanFlyer("BCS402:", name="ssflyM201Roll")

In [34]:
fly_M201Roll.scan_setup(
        bcs_server=bl402bcs, 
        motor="M201 Roll", start=0.3, stop=-0.3, step=-0.02, 
        delay=0., count=0.1,
    )

In [35]:
fly_M201Roll.get()

BcsSigScanFlyerTuple(motor_name='M201 Roll', first_value=0.3, last_value=-0.3, step_value=-0.02, delay_sec=0.0, count_sec=0.1, num_scans=1, bidirectional=False, final_move='Stay', memo='', filename_pattern='*.txt', ready=True, busy=False, done=True, execute_scan=True, data_path='', yield_array_events=False)

In [36]:
from bluesky.plans import fly

RE(fly([fly_M201Roll], md=dict(purpose="demo bluesky fly plan with BCS Single Motor step scan")))



Transient Scan ID: 1     Time: 2021-09-21 07:34:34
Persistent Unique Scan ID: '1d1fc7f8-a638-452d-b399-e6b3aeaf643d'


INFO:root:complete() starting


ssflyM201Roll_done [In progress. No progress bar available.]                                                           
Scan is finshed!


INFO:root:complete() ending


ssflyM201Roll_done [In progress. No progress bar available.]                                                           
                                                                                                                       
New stream: 'primary'
+-----------+------------+
|   seq_num |       time |
+-----------+------------+




+-----------+------------+
generator fly ['1d1fc7f8'] (scan num: 1)





INFO:bluesky.RE.state:Change state on <bluesky.run_engine.RunEngine object at 0x0000020E13875EE0> from 'running' -> 'idle'
INFO:bluesky:Cleaned up from plan <generator object fly at 0x0000020E1D3A12E0>


('1d1fc7f8-a638-452d-b399-e6b3aeaf643d',)

In [37]:
fly_M201Roll.busy.get()
fly_M201Roll.done.get()
fly_M201Roll.data_path.get()

False

True

'C:\\Beamline Controls\\BCS Setup Data\\210921\\Single Motor Scan 000051.txt'

In [38]:
run = db.v2[-1]
data = run.primary.read()
data.plot.scatter("M201 Roll Actual", "I0 BL")

<IPython.core.display.Javascript object>

<matplotlib.collections.PathCollection at 0x20e1d5825e0>

In [39]:
data.plot.scatter("M201 Roll Goal", "M201 Roll Actual")

<IPython.core.display.Javascript object>

<matplotlib.collections.PathCollection at 0x20e1d5198e0>

In [40]:
plt.scatter(
    data["M201 Roll Goal"], 
    data["M201 Roll Actual"] - data["M201 Roll Goal"],
    );
plt.xlabel("M201 Roll Goal");
plt.ylabel("M201 Roll Error");
plt.show()

<IPython.core.display.Javascript object>

<matplotlib.collections.PathCollection at 0x20e1d668c10>

Text(0.5, 0, 'M201 Roll Goal')

Text(0, 0.5, 'M201 Roll Error')

# Which control layer is ophyd using

In [50]:
from ophyd import cl

cl

namespace(setup=<function ophyd._pyepics_shim.setup(logger)>,
          caput=<function epics.caput(pvname, value, wait=False, timeout=60)>,
          caget=<function epics.caget(pvname, as_string=False, count=None, as_numpy=True, use_monitor=False, timeout=5.0)>,
          get_pv=<function epics.pv.get_pv(pvname, form='time', connect=False, context=None, timeout=5.0, connection_callback=None, access_callback=None, callback=None, verbose=False, count=None, auto_monitor=None)>,
          thread_class=epics.ca.CAThread,
          name='pyepics',
          release_pvs=<function ophyd._pyepics_shim.release_pvs(*pvs)>,
          get_dispatcher=<function ophyd._pyepics_shim.get_dispatcher()>)