In [2]:
from IPython.display import display, Markdown, Latex

In [3]:
from notebook.services.config import ConfigManager
cm = ConfigManager()

In [4]:
cm.update('livereveal', {
              'width': 1000,
              'height': 600,
              'scroll': True,
})

{'width': 1000, 'height': 600, 'scroll': True}

In [5]:
# Let's see if the required installation is available
import ophyd
import bluesky


# Select a read only variable which gives a new output every now and then
# I could use the simulation ones from ophyd. I prefer to use some real 
# device

# One read only device
sig_ro = 'TOPUPCC:rdCur'

# A second read only device
sig_ro2 = 'MCLKHX251C:freq'

# Furthermore let's use good practice: a single bluesky run engine
has_run_engine = False
try:
    RE
    has_run_engine = True
except NameError:
    pass
    
if not has_run_engine:
    RE = bluesky.RunEngine()
    
assert(callable(RE))

In [6]:
# Let's set up nb kicker
import matplotlib
%matplotlib inline
import bluesky.utils
bluesky.utils.install_nb_kicker()

In [7]:
#Let's test that the epics variables are really available
import epics
import functools

# A little hack to minimize load on the real IOC's
@functools.lru_cache(maxsize=None)
def create_pv(name):
    t_pv = epics.PV(name)
    return t_pv

# This test should print some values.
# I do it directly using epics.PV to have direct control
# on connections etc ....
for ro_name in [sig_ro, sig_ro2]:
    a_pv = create_pv(ro_name)
    value = a_pv.value
    print(f'Epics PV {ro_name}: Got value {value}')

Epics PV TOPUPCC:rdCur: Got value 0.0
Epics PV MCLKHX251C:freq: Got value 499621.31788209


<h1> Introduction to Bluesky and Ophyd </h1>

Author: Pierre Schnizer


<h2> Literature </h2>

<h3> Python knowledge </h3>

* classes and inheritance
* generators  
* decorators
* asyncio
* Global Intepreter Lock



<h2> Literature </h2>

<h3> Python </h3>
The author assumes that the reader is familiar with python language version 3. 
Bluesky and Ophyd make have use of:

* asyncio
* generators  
* decorators
* Global Intepreter Lock



<h3> Abstraction </h3>

Most of these implementation details are abstracted in an elegant manner. 
But ocassioally, in particular when debugging 
a faulty device driver, it can be helpful to be awaare of the general concepts of asynchronous programming.

Generators are used by bluesky plans or plan stubs. If plans are used, which are provided by bluesky,
the user does not need to worry about these details.

If one wants to implement ophyd device drivers, the concept of classes should be understood.

<h3> Bluesky and ophyd links </h3>

* Bluesky and ophyd are available from https://github.com/bluesky/bluesky
* Documentation: https://blueskyproject.io/bluesky/
* Tutorial: https://github.com/bluesky/tutorial/
* Chat https://gitter.im/NSLS-II/DAMA

<h2> General Concepts </h2>

Bluesky is used with already two meanings:
* Bluesky collaboration: a set of different modules used to simplify taking data within the EPICS control system environement
* Bluesky as the python module, which provides the RunEngine and plans

In the following I will use "Bluesky" as reference to the module. If I want to refer to the whole package, I refer to it
as "Bluesky measurement orchestration"



<h3> Bluesky build up</h3>

Bluesky collaboration uses the following concpets:
* A control system (EPICS) is responsible for taking data
* This control system provides asynchronos and parallel access to the devices using variables
* It separates device abstraction from measurement plans
* In this concept:

    * Ophyd allows making devices accessible abstracted as classes with different methods
    * Bluesky provides measurement plans and the run engine
    * The run engine executes the plans by calling appropriate methods of the involved devices. 
      (This description is a bit imprecise and will be detailed further below)




<h3> Asynchronous access</h3>

At this stage, one could still directly access the different process variables. But the run engine
assures that these are accessed in an asynchronous fashion. How this works is detailed below.

Furthermore:
* these modules (as much as python) are not real time software (and in particular not 
  *hard* real time software)
* but it has the tools to:
  * set up different hard real time devices
  * to get them going 
  * and to collect their data in the end
* It is right to control a small or medium size experiments

Your millage may vary ;-)

<h3> Device driver example </h3>

Let's look to a simple device driver. For the beginning let's assume we have a read only
Epics variable with name `sig_ro`.


In [8]:
from ophyd import Device, Component as Cpt, EpicsSignalRO

class ReadonlyDevice(Device):
    """A simple sample device
    
    This devivce is solely used to describe the methods
    """
    ro = Cpt(EpicsSignalRO, sig_ro)

In [9]:
# let's keep namespace clean 
del Device, Cpt, EpicsSignalRO

<h3> Instantiating Run Engine </h3>
    
* A Run Engine is required to execute plans for executing the measurement.
* A Run Eninge should be tyically only instantiated once. 
* So better to check if one is there already:

In [10]:
RE

<bluesky.run_engine.RunEngine at 0x7fc45c42a1d0>

* if a name error occurs use the code below
* don't forget to uncomment the run engine defintion

In [11]:
from bluesky import RunEngine

# if the code above gives a name error, 
# uncomment the following line
# RE = RunEngine({})

<h3>Executing a simple measurement</h3>

* That's all the setup required if nothing fancy needed
* Now let's read the detector 5 times

In [12]:
from bluesky import plans as bps 

ro_dev = ReadonlyDevice(name = 'sig')

if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]
RE(bps.count(detectors, 5))

('82518b35-0f0e-484b-b793-08b43913c116',)

<h3> Device: What's happening under the hood?</h3>

The `Device` code is not yet really showing what happens, so lets see what is happening under the hood.

This we can do if we use the same code as above and enable logging

In [13]:
# Check that debug print works!
#RE.log.setLevel('INFO')
ro_dev = ReadonlyDevice(name = 'sig')


if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]
RE.log.setLevel('DEBUG')
RE(bps.count(detectors, 1))

[I 17:28:43.480 run_engine:821] Executing plan <generator object count at 0x7fc451f10f48>
[I 17:28:43.481 run_engine:116] Change state on <bluesky.run_engine.RunEngine object at 0x7fc45c42a1d0> from 'idle' -> 'running'
[D 17:28:43.482 run_engine:1421] stage: (ReadonlyDevice(prefix='', name='sig', read_attrs=['ro'], configuration_attrs=[])), (), {}
[D 17:28:43.482 run_engine:1421] open_run: (None), (), {'detectors': ['sig'], 'num_points': 1, 'num_intervals': 0, 'plan_args': {'detectors': ["ReadonlyDevice(prefix='', name='sig', read_attrs=['ro'], configuration_attrs=[])"], 'num': 1}, 'plan_name': 'count', 'hints': {'dimensions': [(('time',), 'primary')]}}
[D 17:28:43.483 run_engine:1631] Emitted RunStart (uid='7cdde793-221a-4864-8338-cc32ea7cabcf')
[D 17:28:43.483 run_engine:1421] checkpoint: (None), (), {}
[D 17:28:43.484 run_engine:1421] trigger: (ReadonlyDevice(prefix='', name='sig', read_attrs=['ro'], configuration_attrs=[])), (), {'group': 'trigger-7f4ba2'}
[D 17:28:43.484 run_engin

('7cdde793-221a-4864-8338-cc32ea7cabcf',)

Now let's look to the output. There is quite some output being presented. 
What is happending? Let's look under the hood:

* There is a call to trigger made
* There is a call to read

So what's the difference?
* The method trigger is allowed to take time (but not significant computation time!) 
* The method read is used to retrieve the data

What are the two methods doing (using the :class:`ophyd.Device`):
* The method "trigger" triggers a device and waits for it to deliver data
* The method "read" reads actually the data


**NB** who to actually trigger a device is shown further down

Let's intercept the methods to get a better understanding

<h3> Setting up logging output </h3>

Bluesky and ophyd simplify logging if one
* intercepts calls: method overloading
* activates logging accordingly

<h4> Logging by method overloading <h4>

In [14]:
from ophyd import Component as Cpt, Device as _oDevice, EpicsSignalRO as _oEpicsSignalRO
class LogMethodCalls:
    def trigger(self):
        status = super().trigger()
        cls_name = self.__class__.__name__
        txt = f'{cls_name} method trigger returned {status}'
        self.log.debug(txt)
        return status
    
    def read(self):
        result = super().read()
        cls_name = self.__class__.__name__
        txt = f'{cls_name} method read returned {result}'
        self.log.debug(txt)
        return result

class Device(LogMethodCalls, _oDevice):
    pass

class EpicsSignalRO(LogMethodCalls, _oEpicsSignalRO):
    pass


<h4> Device logging </h4>

* With the fragment above the code only requires to be executed again
* **NB** please take care that 'Device' and 'EpicsSignalRO' are not imported from 'ophyd'

In [15]:
class ReadonlyDevice(Device):
    """A simple sample device
    
    This devive is solely implemented as an example for a simple read only device
    """
    ro = Cpt(EpicsSignalRO, sig_ro)

<h4> Setting up loggers </h4>

* Bluesky has a nice formatted logger
* logging.basicConfig interfers with this setup
* Nicer output is given by 

This is achieved in the following manner
* instantiate the device


In [16]:
ro_dev = ReadonlyDevice(name = 'sig')

Install the appropriate handler
* select bluesky handler
* register it with (the parent) of the device logger

In [17]:
bp_re_log = RE.log.parent.handlers[0]
ro_dev.log.parent.addHandler(bp_re_log)

Finally set the logging level appropriately

In [18]:
RE.log.setLevel('WARNING')
ro_dev.log.setLevel('DEBUG')

Execute the code

In [19]:
if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]
RE(bps.count(detectors, 1))

[D 17:28:43.524 device:520] Staging sig
[D 17:28:43.525 <ipython-input-14-7b7f6cdf7089>:7] ReadonlyDevice method trigger returned DeviceStatus(device=sig, done=True, success=True)
[D 17:28:43.529 <ipython-input-14-7b7f6cdf7089>:14] EpicsSignalRO method read returned {'sig_ro': {'value': 0.0, 'timestamp': 1566919723.50771}}
[D 17:28:43.529 <ipython-input-14-7b7f6cdf7089>:14] ReadonlyDevice method read returned OrderedDict([('sig_ro', {'value': 0.0, 'timestamp': 1566919723.50771})])
[D 17:28:43.531 device:586] Unstaging sig


('a66c4842-20ac-40c3-9c41-5c1db3d1ca71',)

<h4> Logging Summary </h4>

* See full code below
* **NB** the code is reordered
* target: logging output only where required    


In [20]:
ro_dev = ReadonlyDevice(name = 'sig')
if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]

bp_re_log = RE.log.parent.handlers[0]
ro_dev.log.parent.addHandler(bp_re_log)

RE.log.setLevel('WARNING')
ro_dev.log.setLevel('DEBUG')
RE(bps.count(detectors, 1))

[D 17:28:43.591 device:520] Staging sig
[D 17:28:43.592 <ipython-input-14-7b7f6cdf7089>:7] ReadonlyDevice method trigger returned DeviceStatus(device=sig, done=True, success=True)
[D 17:28:43.595 <ipython-input-14-7b7f6cdf7089>:14] EpicsSignalRO method read returned {'sig_ro': {'value': 0.0, 'timestamp': 1566919723.50771}}
[D 17:28:43.596 <ipython-input-14-7b7f6cdf7089>:14] ReadonlyDevice method read returned OrderedDict([('sig_ro', {'value': 0.0, 'timestamp': 1566919723.50771})])
[D 17:28:43.597 device:586] Unstaging sig


('afb3ae90-0d76-4a27-a4c9-9a62538277bb',)

<h3> Device method evaluation </h3>

The evaluation shows different methods

Now we see a difference:
* trigger returns a status object
* while read returns data

What's a status object:
* a delayed return value
* think of the tokens you get in self service restaurants: they buzz as soon as your order is finished


This is the general concept of ophyd and bluesky:
* methods returning data are expected to execuate without any delay
* methods returning status objects are allowed responding with some delay.
  (but not to take computation. How to do that is detailed furhter down)
  
What'S the status object: well think of the new resteraunts that give you a token when you order: 
the token rings as soon as you are finished.

I'll detail that in an other example:

In [21]:
ro_dev = ReadonlyDevice(name = 'sig')

# Yes that is a hack 
# But there are other methods to get it better organised
if not ro_dev.connected:
    ro_dev.wait_for_connection()

# ro_dev.log.setLevel('DEBUG')
detectors = [ro_dev]
RE(bps.count(detectors, 1))

[D 17:28:43.654 device:520] Staging sig
[D 17:28:43.655 <ipython-input-14-7b7f6cdf7089>:7] ReadonlyDevice method trigger returned DeviceStatus(device=sig, done=True, success=True)
[D 17:28:43.657 <ipython-input-14-7b7f6cdf7089>:14] EpicsSignalRO method read returned {'sig_ro': {'value': 0.0, 'timestamp': 1566919723.50771}}
[D 17:28:43.658 <ipython-input-14-7b7f6cdf7089>:14] ReadonlyDevice method read returned OrderedDict([('sig_ro', {'value': 0.0, 'timestamp': 1566919723.50771})])
[D 17:28:43.660 device:586] Unstaging sig


('d81891eb-c76b-4354-a64f-04e7abe3cd5d',)

In [22]:
from ophyd.status import DeviceStatus

class ReadonlyDevice(Device):
    """A simple sample device
    
    This devivce is solely used to describe the methods
    
    Warning:
        This code is not properly made
    """
    ro = Cpt(EpicsSignalRO, sig_ro)
    
    def trigger(self):
        """
        
        Warning:
         This code is not the most elegant one ....
          See further down for a simpler one
        """
        cid = None
        def cb(*args, **kwargs):
            self.ro.clear_sub(cb)
            status._finished()

        status = DeviceStatus(self.ro, timeout = 10)
        self.ro.subscribe(cb)
        return status

The trigger method shows:
* a status object is created. 
* a callback is subscribed to the variable
* a status object is returned.

How is that code executed:
* The run engine will call trigger on the method  of this object
* it will recieve a status object. 

The run eninge will take the status object and wait for it to complete. If different devices are to be checked it 
will wait that the last one has completed. In this manner it ensures that the different variables are handled in 
parallel. 

Please note: typically this code uses epics.PV. Thus this approach is not suitable for a large set of variables!


This code can be further simplified if we use SubscriptionStatus instead of device status. Let's implement 
the smae code again


In [23]:
from ophyd.status import SubscriptionStatus

import sys

class ReadonlyDevice(Device):
    """A simple sample device
    
    """
    ro = Cpt(EpicsSignalRO, sig_ro)
    
    def trigger(self):
        """Ensure that new data has arrived
        """
        def cb(*args,  **kwargs):
            value = kwargs['value']
            old_value = kwargs['old_value']
            timestamp = kwargs['timestamp']
            txt = f'cb {cb}: value = {value} old value {old_value} timestamp {timestamp}\n'
            self.log.debug(txt)
            return True
        
        status = SubscriptionStatus(self.ro, cb, run = False)
        return status

In [24]:
ro_dev = ReadonlyDevice(name = 'sig')

if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]
RE(bps.count(detectors, 1))

[D 17:28:43.686 ophydobj:187] This is the first instance of OphydObject. name={self.name}, id={id(self)}
[D 17:28:43.738 device:520] Staging sig
[D 17:28:43.748 <ipython-input-23-d985b3628c31>:19] cb <function ReadonlyDevice.trigger.<locals>.cb at 0x7fc4517526a8>: value = 0.0 old value None timestamp 1566919723.50771
    
[D 17:28:43.749 <ipython-input-14-7b7f6cdf7089>:14] EpicsSignalRO method read returned {'sig_ro': {'value': 0.0, 'timestamp': 1566919723.50771}}
[D 17:28:43.750 <ipython-input-14-7b7f6cdf7089>:14] ReadonlyDevice method read returned OrderedDict([('sig_ro', {'value': 0.0, 'timestamp': 1566919723.50771})])
[D 17:28:43.751 device:586] Unstaging sig


('050a63ed-cc59-45a4-98d3-cdc80ec7a3bd',)

So now we arrive at some pretty straight forward code

When trigger is called, the object SubscriptionStatus will subscribe the function (here ensure_run_once) to self.ro.
Then the callback 'cb' is called every time when new data arrrive at signal self.ro.

The keyword argument run=False, ensures that the method is only called the first time when  new data is arrived.
It defaults to True. In this case it will be run immediately after the callback 'cb' has been subscribed 
to the device.

Please note again, that all these triggers are expected to behave well. IF you need to wait or to do processing,
see further down how to do that



Now we have used a very simple device Now let's see if we have a device with two signals

In [25]:
class ReadonlyDevice(Device):
    """A simple sample device
    
    """
    ro = Cpt(EpicsSignalRO,  sig_ro)
    ro2 = Cpt(EpicsSignalRO, sig_ro2)

Now lets run the same simple example again as above:

In [26]:
from bluesky import plans as bps, RunEngine 


ro_dev = ReadonlyDevice(name = 'sig')


if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]
RE(bps.count(detectors, 1))



[D 17:28:43.767 ophydobj:187] This is the first instance of OphydObject. name={self.name}, id={id(self)}
[D 17:28:43.821 device:520] Staging sig
[D 17:28:43.822 <ipython-input-14-7b7f6cdf7089>:7] ReadonlyDevice method trigger returned DeviceStatus(device=sig, done=True, success=True)
[D 17:28:43.823 <ipython-input-14-7b7f6cdf7089>:14] EpicsSignalRO method read returned {'sig_ro': {'value': 0.0, 'timestamp': 1566919723.50771}}
[D 17:28:43.836 <ipython-input-14-7b7f6cdf7089>:14] EpicsSignalRO method read returned {'sig_ro2': {'value': 499621.31788209, 'timestamp': 1565684923.028094}}
[D 17:28:43.836 <ipython-input-14-7b7f6cdf7089>:14] ReadonlyDevice method read returned OrderedDict([('sig_ro', {'value': 0.0, 'timestamp': 1566919723.50771}), ('sig_ro2', {'value': 499621.31788209, 'timestamp': 1565684923.028094})])
[D 17:28:43.838 device:586] Unstaging sig


('30e59bbb-59bc-4f6e-b495-12975a4fea71',)

The  important thing to note now:
* We can see one trigger
* and two read calls.

That's the policy of bluesky (as far as I understand)
* It will call trigger only for the top device. It is 
  the responsiblity of this device to delegate the triggers to other objects
  if required.
  
* Reads are then transfered to all other devices.

Please note that devices can be generated as a tree. The user can call handle 
either the top device or a leaf of the tree to the run engine. The run engine
will call the method trigger only on the object passed to it.

In this manner the device driver writer can organise how the device is read.
For example, let's ensure that the data of the second signal has arrived only
after the signal for the first one has arrived.

This can be implemented in different manners. The one I chose is used to demonstrate
the add_callback method of the status object.

In [27]:
from ophyd import Component as Cpt
from ophyd.status import SubscriptionStatus, AndStatus

class ReadonlyDevice(Device):
    """Ensure data of signal ro2 are always newer than for signal ro. 
    """
    ro = Cpt(EpicsSignalRO,  sig_ro)
    ro2 = Cpt(EpicsSignalRO, sig_ro2)
    
    def trigger(self):
        
        ro_done = False
        def cb(*args, **kwargs):
            return True
    
        def cb2(*args, **kwargs):
            #nonlocal ro_done
            if ro_done:
                return True
            return False
    
        def delayed_call(*args, **kwargs):
            #nonlocal ro_done
            ro_done = True
        
        stat2 = SubscriptionStatus(self.ro, cb2, run = False, timeout = 5)
        stat = SubscriptionStatus(self.ro, cb, run = False, timeout = 5)
        stat.add_callback(delayed_call)
        s = AndStatus(stat2, stat)
        return s

In [28]:
from bluesky import plans as bps, RunEngine 

# Check that debug print works!
#RE = RunEngine()
#RE.log.setLevel('DEBUG')


ro_dev = ReadonlyDevice(name = 'sig')

# Yes that is a hack 
# But there are other methods to get it better organised
ro_dev.log = RE.log
if not ro_dev.connected:
    ro_dev.wait_for_connection()

detectors = [ro_dev]
RE(bps.count(detectors, 1))

[D 17:28:43.872 ophydobj:187] This is the first instance of OphydObject. name={self.name}, id={id(self)}
[E 17:28:48.934 run_engine:1522] Run aborted
    Traceback (most recent call last):
      File "/home/mfp/Devel/nsls_ii/bluesky/bluesky/run_engine.py", line 1390, in _run
        msg = self._plan_stack[-1].send(resp)
      File "/home/mfp/Devel/nsls_ii/bluesky/bluesky/plans.py", line 67, in count
        return (yield from inner_count())
      File "/home/mfp/Devel/nsls_ii/bluesky/bluesky/utils.py", line 1045, in dec_inner
        return (yield from plan)
      File "/home/mfp/Devel/nsls_ii/bluesky/bluesky/preprocessors.py", line 940, in stage_wrapper
        return (yield from finalize_wrapper(inner(), unstage_devices()))
      File "/home/mfp/Devel/nsls_ii/bluesky/bluesky/preprocessors.py", line 501, in finalize_wrapper
        ret = yield from plan
      File "/home/mfp/Devel/nsls_ii/bluesky/bluesky/preprocessors.py", line 938, in inner
        return (yield from plan)
      File

Exception in thread Thread-7:
Traceback (most recent call last):
  File "/home/mfp/Devel/nsls_ii/ophyd/ophyd/status.py", line 79, in _wait_and_cleanup
    wait(self, timeout=timeout, poll_rate=0.2)
  File "/home/mfp/Devel/nsls_ii/ophyd/ophyd/status.py", line 574, in wait
    '(elapsed {:.2f} sec)'.format(timeout, elapsed))
TimeoutError: Operation failed to complete within 5.0 seconds (elapsed 5.01 sec)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mfp/Devel/nsls_ii/ophyd/ophyd/status.py", line 87, in _wait_and_cleanup
    self._handle_failure()
  File "/home/mfp/Devel/nsls_ii/ophyd/ophyd/status.py", line 299, in _handle_failure
    self.device.stop()
AttributeError: 'EpicsSignalRO' object has no attribute 'stop'

Exception in

FailedStatus: AndStatus(done=True, success=False)

This code could be chained further on. Such code can extended further on and stacked. 
Based on my experience: if the device needs more handling I recommend to consider to 
base the code on a state engine.

<h3> Summary </h3>

So to sum up:

Bluesky run eninge is based on cooperative multi tasking. It assumes that all calls to ophyd device require only a short time to run.
Methods that return a status object, are expected to return the status object immediately. 

* bluesky run engine $\to$ cooperative multi task
* ophyd device driver:
   * typically: methods are expected short sequence of CPU
   * methods returning status objects:
      * Device status objects
         * return status object immediatly
         * status object method '_finished()' called when done
      * or use 'SubscriptionStatus'
    * examples given here: callbacks triggered by epics variable changes
      ophyd subscribes to the device


<h2>Ophyd signals: everything is a signal </h2>  

Compariosn: Unix everything is a file

Ophyd:
* Everything is a signal
* Signals are used to create events
* Lifeplots can use them
* Bluesky run engine uses signals to gather device information
* See further down on event model to understand how it simplifies
  measurements

<h2> Settable devices </h2>

The description above concentrated on generating devices, which allow only reading data. 
Let's see how to implement that. Here I will not go into the basis but show how it can be done
using the already implemented bases of ophyd.

(I did all the details above as I wanted to illustrate the difference of direct calls to delayed 
calls).

For ophyd settable devices are consideed as "motors". Any device that has a set value and a read value falls into 
this category. Personally I prefer to think of an actuator instead of a motor.

Ophyd has several implementations, which facilitate implementing actuators:
* Motors
* PVPositioner
* PVPositionerPC

Let's implement a PVPositioner (see also https://github.com/TMsangohan/bact2/blob/66d4bea4f13507b1b82ccb3577fc6a7bfc4b627c/examples/exp/mono_chromator.py#L1)

In [29]:
from ophyd import EpicsSignal, EpicsSignalRO, PVPositioner, Component as Cpt

class Monochromator( PVPositioner ):
    setpoint    = Cpt(EpicsSignal,   ':monoSetEnergy')
    readback    = Cpt(EpicsSignalRO, ':monoGetEnergy')
    done        = Cpt(EpicsSignalRO, ':multiaxis:running')

<h3> PV positioner </h3>

Ophyd PV Positioner: requires three variables:
* setpoint: a variable that describes the desired value
* readback: a variable that descrives the read back value
* done: a variable that indicates if the move is finished

**Note**: ophyd will consider the move finished when the variable done is set to **True**
Setpoint is only used to set the value, readback to read the value
Based on these signals the user can get feedback on how long it will take that the 
move is finished

<h3> PV positioner </h3>

Ophyd PV Positioner: more functionality:
* done: indication value can be set to **False**
* stop value: used to request a stop to the actuator

See an implementation for a BESSY II monochromator for a idea of it

In [30]:
from ophyd import EpicsSignal, EpicsSignalRO, PVPositioner, Component as Cpt

class Monochromator( PVPositioner ):
    setpoint    = Cpt(EpicsSignal,   ':monoSetEnergy')
    readback    = Cpt(EpicsSignalRO, ':monoGetEnergy')
    done        = Cpt(EpicsSignalRO, ':multiaxis:running')
    done_val = 0
    stop_signal = Cpt(EpicsSignal,   ':multiaxis:stop')
    stop_val = 1

<h3>Adapting it further to our needs</h3>
    
The monochromator now is fairly nicely  fulfilling our needs. But we can go further....

Ophyd follows the idea of intelligent devices:
* Settle time: how long does it take a device to stabilise after 
  the movement has signalled finished (e.g. a bar stops osszilating)
* Time out: maximum time that the movement is allowwed to take

**NB** these are variables of the status object. Thus' these can be 
computed if required....


In [31]:
class _Monochromator( PVPositioner ):
    setpoint    = Cpt(EpicsSignal,   ':monoSetEnergy')
    readback    = Cpt(EpicsSignalRO, ':monoGetEnergy')
    done        = Cpt(EpicsSignalRO, ':multiaxis:running')
    done_val = 0
    stop_signal = Cpt(EpicsSignal,   ':multiaxis:stop')
    stop_val = 1


    def trigger(self):
        """Trigger only after new data has arrived
        Deactivated not to interfere with :class:`PVPositioner`
        
        Warning:
            Untested code!!!
            
        Better test it first
        """
        status = super().trigger()
        status.set_settle

        def cb_rdbk(*args, **kwargs):
            return True
        
        stat_rdbk = SubscriptionStatus(self.readback, cb_rdbk, run = False, timeout = 1, settle_time = 2)
        return stat_rdbk

class Monochromator( Device ):
    dev = Cpt(_Monochromator, 'u171dcm1', name = 'mc', egu = 'eV')

<h3> Everything is a signal </h3>

Settle time and timeouts are values, which can be convieniently stored in the event document. 

This can be handled too: make these values signals


In [32]:
from ophyd import PVPositioner, Component as Cpt, Signal, EpicsSignal, EpicsSignalRO

class _Monochromator( PVPositioner ):
    setpoint    = Cpt(EpicsSignal,   ':monoSetEnergy')
    readback    = Cpt(EpicsSignalRO, ':monoGetEnergy')
    done        = Cpt(EpicsSignalRO, ':multiaxis:running')
    done_val = 0
    stop_signal = Cpt(EpicsSignal,   ':multiaxis:stop')
    stop_val = 1

    timeout    = Cpt(Signal, 'timeout', default=1.0)
    settle_time = Cpt(Signal, 'settle_time', default=2.0), 
    
    def trigger(self):
        """Trigger only after new data has arrived
        Deactivated not to interfere with :class:`PVPositioner`
        
        Warning:
            Untested code!!!
            
        Better test it first
        """
        status = super().trigger()
    
        stat_rdbk = SubscriptionStatus(self.readback, cb_rdbk, run = False, 
                                       timeout = self.timeout.value, 
                                       settle_time = self.settle_time.value)
        stat_rdbk.add_callback(status)
        return stat_rdbk

<h3> Marking signals as configuration signals</h3>

Blueskys RunEngin will emit event documents. The setup is described in http://nsls-ii.github.io//architecture-overview.html.

As a rule of thumb:
* Ophydd devices can contain a tree of devices
* Bluesky inspects the devices and read all signals
* The information of all signals will be added to each event



In [33]:
class _Monochromator( PVPositioner ):
    # Class members only stored in configuration run
    _default_configuration_attrs = ('timeout', 'settle_time')

In this case the timeout and settle time variables are read out only at the beginning of the run.

<h3> A slow moving device </h3>

If we have a slow moving device, perhaps we want to compute the acceptable 
time for the movement. This time can depend very much on the required time ...


Here two points should be separated:
* appropriate time out value depends on the path length (see next slide)
* how to inform the user, that progress takes some time


In [34]:
class SlowMovingDevice ( PVPositioner ):
    setpoint = Cpt(EpicsSignal,   ':set')
    readback = Cpt(EpicsSignalRO, ':rbk')
    done     = Cpt(EpicsSignalRO, ':done')
    
    timeout    = Cpt(Signal, 'timeout', default=1.0)
    settle_time = Cpt(Signal, 'settle_time', default=2.0), 

    
    def computeMovementTime(self, current, target):
        """
        Todo:
            compute settle time
        """
        
        return settle_time
    
    def set(self, value):
        """
        
        WARNING:
            Untested code!
        """
        current = self.readback.value
        movement_time = self.computeMovementTime(current, value)
        # how much to add to the pure movement time?
        # typically a percentage and an additional value
        setlle_time = movement_time
        self.settle_time.set(settle_time)
        
        # create a status object or add the settle time to the 
        # status object


<h3> Informing the user on progress </h3>

* Progress bar: provided by bluesky
* Requires attachment to RunEngine


In [35]:
from bluesky.utils import ProgressBarManager
# if you are running from shell: seems to become obsolete
# from bluesky.utils import install_qt_kicker
from bluesky.utils import install_nb_kicker

install_nb_kicker()
RE.waiting_hook = ProgressBarManager()

<h4> Info to progress bar manager </h4>

* Modell: Buzzer in the beer garden
* Normally: buzzes when dish is ready
* preferable version: 
    * have a display showing when dish expected to finish
    * e.g. cook entering periodically when happen next  
* ophyd: device has to inform on status

see `MoveStatus` https://github.com/bluesky/ophyd/blob/master/ophyd/status.py
and in particular `watch` method 
see next slide

In [36]:
class FollowStatus(ophyd.status.SubscriptionStatus):
    def watch(self, func):
        """
        Subscribe to notifications about progress. Useful for progress bars.
        Parameters
        ----------
        func : callable
            Expected to accept the keyword aruments:
                * ``name``
                * ``current``
                * ``initial``
                * ``target``
                * ``unit``
                * ``precision``
                * ``fraction``
                * ``time_elapsed``
                * ``time_remaining``
        """
    

<h4> Implementation example: Movestatus</h4>

In [37]:
class FollowStatus(ophyd.status.SubscriptionStatus):
    """
    Warning:
        Do not use: just for illustration purpose
    """
    def __init__(self, dev, *args, **kws):
        super().__init__(self, dev, *args, **kws)
         
        self.device.subscribe(self._notify_watchers)
            
    def _notify_watchers(self, value, *args, **kwargs):
        if not self._watchers:
            return
        
        for watcher in self._watchers:
            # watcher(name=, current = , initial = , target  = , unit= , precision = , time_elapsed=, fraction=)
            pass

Be aware: 
 * this example: triggered by changes on device
 * Typically updates every **50 ms**
 * bluesky expectes updates at least every **0.2 s**
 * If slower: will complain that no information is available

<h3> Watching progress </h3>

In [38]:
from ophyd.sim import SynAxis, det

class SimMoveStatus(DeviceStatus):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        if not self.done:
            self.device.subscribe(self._notify_watchers,
                                   # event_type=self.device.SUB_READBACK
                                  )

    def _settled(self,  *args, **kwargs):
        self.device.clear_sub(self._notify_watchers)
        self._watchers.clear()

    def _notify_watchers(self, value, *args, **kwargs):
        if not self._watchers:
            return
        
        dev = self.device
        for watcher in self._watchers:
            watcher(name = dev.name, current = dev.readback, target = dev.setpoint
            )

class SlowSimulatedAxis(SynAxis):
    def set(self, value):
        stat = super().set(value)
        #self.log.warning(f"status = {status} of type {type(status)}")
        
        stat2 = SimMoveStatus(device = self)
        def cb_finished(**kwargs):
            nonlocal stat2
            stat2._finished()
        stat.add_callback(cb_finished)
        
        return stat2
        
motor = SlowSimulatedAxis(name = 'motor', delay = .5)
from bluesky.utils import ProgressBarManager
from bluesky.callbacks.best_effort import BestEffortCallback

import bluesky.plans as bp

RE.waiting_hook = ProgressBarManager()
# RE.subscribe(BestEffortCallback())
RE.log.setLevel('WARNING')
detectors = [det]
RE(bp.scan(detectors, motor, 0, 3, num=2))

Transient Scan ID: 9     Time: 2019-08-27 17:29:16
Persistent Unique Scan ID: '28009dff-258c-49db-95f9-5b27fc6bc82e'
motor [In progress. No progress bar available.]                                
[AA 'deferred pause' has been requested. The RunEngine will pause at the next checkpoint. To pause immediately, hit Ctrl+C again in the next 10 seconds.Deferred pause acknowledged. Continuing to checkpoint.

trying a second time
Pausing...
                                                                               
[A

RunEngineInterrupted: 
Your RunEngine is entering a paused state. These are your options for changing
the state of the RunEngine:

RE.resume()    Resume the plan.
RE.abort()     Perform cleanup, then kill plan. Mark exit_stats='aborted'.
RE.stop()      Perform cleanup, then kill plan. Mark exit_status='success'.
RE.halt()      Emergency Stop: Do not perform cleanup --- just stop.


In [None]:
bp.scan?

In [None]:
RE.stop()

<h1> Code fragments from gitter </h1>

In [None]:
from ophyd import Device, Component as Cpt, EpicsSignal, Signal, DeviceStatus

class AccumulatingSignal(Device):
     target = Cpt(EpicsSignal, 'thermo:I')
     window_size = Cpt(Signal, value=5)
     last_read = Cpt(Signal, value=[])     


     def trigger(self):

          dbuffer = []
          count = 0
          target_N = self.window_size.get()
          status = DeviceStatus(self)

          def accumulating_callback(value, **kwargs):
               if status.done:
                    self.target.clear_sub(accumulating_callback)
               nonlocal count

               dbuffer.append(value)
               count += 1

               if count >= target_N:
                    self.last_read.put(dbuffer[:target_N])
                    self.target.clear_sub(accumulating_callback)                         
                    status._finished()

          self.target.subscribe(accumulating_callback, run=False)          

          return status

     def put(self, *args, **kwargs):
           raise Exception("no, read only")