# Blue Sky Run Engine

Contents:

* The Run Engine processes messages
* There is two-way communication between the message generator and the Run Engine
* Control timing with 'sleep' and 'wait'
* Runs can be aborted
* Data can be consumed live by user-defined functions

In [25]:
%run bs.py

motor = Mover('motor', ['pos'])
det = SynGauss('sg', motor, 'pos', center=0, Imax=1, sigma=1)

## The Run Engine processes messages

A message has four parts: a command string, an object, a tuple of positional arguments, and a dictionary of keyword arguments.

In [26]:
Msg('set', motor, {'pos': 5})

set: (mover: motor), ({'pos': 5},), {}

In [27]:
Msg('trigger', motor)

trigger: (mover: motor), (), {}

In [28]:
Msg('read', motor)

read: (mover: motor), (), {}

In [29]:
RE = RunEngine()

In [6]:
def simple_scan(motor):
    "Set, trigger, read"
    yield Msg('set', motor, {'pos': 5})
    yield Msg('trigger', motor)
    yield Msg('read', motor)
    
RE.run(simple_scan(motor))

Emitted RunStart:
{'beamline_id': 'test', 'uid': '01d84316-c77e-4b2b-805f-143aae45f892', 'scan_id': 123, 'time': 1432999373.570635, 'owner': 'tester'}
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
read: (mover: motor), (), {}
   ret: {'pos': {'timestamp': 1432999373.875185, 'value': 5}}
Emitted RunStop:
{'exit_status': 'success', 'run_start': '01d84316-c77e-4b2b-805f-143aae45f892', 'reason': '', 'time': 1432999374.077595}


Moving a motor and reading it back is boring. Let's add a detector.

In [7]:
def simple_scan2(motor, det):
    "Set, trigger motor, trigger detector, read"
    yield Msg('set', motor, {'pos': 5})
    yield Msg('trigger', motor)
    yield Msg('trigger', det)
    yield Msg('read', det)
    
RE.run(simple_scan2(motor, det))

Emitted RunStart:
{'beamline_id': 'test', 'uid': '23ba1252-fe53-4df7-b864-2106df807c9e', 'scan_id': 123, 'time': 1432999375.54955, 'owner': 'tester'}
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'value': 3.7266531720786709e-06, 'timetamp': 1432999375.960118}}
Emitted RunStop:
{'exit_status': 'success', 'run_start': '23ba1252-fe53-4df7-b864-2106df807c9e', 'reason': '', 'time': 1432999376.170663}


## There is two-way communication between the message generator and the Run Engine

Above we the three messages with the responses they generated from the RunEngine. We can use these responses to make our scan adaptive.

In [8]:
def adaptive_scan(motor, det, threshold):
    """Set, trigger, read until the detector reads intensity < threshold"""
    i = 0
    while True:
        print("LOOP %d" % i)
        yield Msg('set', motor, {'pos': i})
        yield Msg('trigger', motor)
        yield Msg('trigger', det)
        reading = yield Msg('read', det)
        if reading['intensity']['value'] < threshold:
            print('DONE')
            break
        i += 1

RE.run(adaptive_scan(motor, det, 0.2))

Emitted RunStart:
{'beamline_id': 'test', 'uid': 'c0f0e5ba-ebbe-4095-bfe7-6705e965d4ca', 'scan_id': 123, 'time': 1432999378.712136, 'owner': 'tester'}
LOOP 0
set: (mover: motor), ({'pos': 0},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'value': 1.0, 'timetamp': 1432999379.120565}}
LOOP 1
set: (mover: motor), ({'pos': 1},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'value': 0.60653065971263342, 'timetamp': 1432999379.640139}}
LOOP 2
set: (mover: motor), ({'pos': 2},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'value': 0.1353352832366127, 'timetamp': 1432999380.151278}}
DONE
Emitted RunStop:
{'exit_status': 'success', 'run_start': 'c0f0e5ba-ebbe-4095-bfe7-67

## Control timing with 'sleep' and 'wait'

The 'sleep' command is as simple as it sounds.

In [10]:
def sleepy_scan(motor, det):
    "Set, trigger motor, sleep for a fixed time, trigger detector, read"
    yield Msg('set', motor, {'pos': 5})
    yield Msg('trigger', motor)
    yield Msg('sleep', None, 2)  # units: seconds
    yield Msg('trigger', det)
    yield Msg('read', det)
    
RE.run(sleepy_scan(motor, det))

Emitted RunStart:
{'owner': 'tester', 'scan_id': 123, 'beamline_id': 'test', 'uid': '7040be31-773a-4170-be40-274d4cff7366', 'time': 1432995082.954654}
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
sleep: (None), (2,), {}
   ret: None
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'timetamp': 1432995085.475303, 'value': 3.7266531720786709e-06}}
Emitted RunStop:
{'time': 1432995085.683052, 'reason': '', 'exit_status': 'success', 'run_start': '7040be31-773a-4170-be40-274d4cff7366'}


The 'wait' command is more powerful. It watches for Movers (e.g., `motor`) to report being done.

### Wait for one motor to be done moving

In [3]:
def wait_one(motor, det):
    "Set, trigger, read"
    yield Msg('set', motor, {'pos': 5})
    yield Msg('trigger', motor, block_group='A')  # Add motor to group 'A'.
    yield Msg('wait', None, 'A')  # Wait for everything in group 'A' to report done.
    yield Msg('trigger', det)
    yield Msg('read', det)
    
RE.run(wait_one(motor, det))

Emitted RunStart:
{'scan_id': 123, 'uid': '6ec6a162-0a9f-4500-9b1f-200e2a9be499', 'owner': 'tester', 'time': 1432995419.37463, 'beamline_id': 'test'}
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {'block_group': 'A'}
   ret: None
wait: (None), ('A',), {}
   ret: {mover: motor}
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'timetamp': 1432995419.890234, 'value': 3.7266531720786709e-06}}
Emitted RunStop:
{'reason': '', 'run_start': '6ec6a162-0a9f-4500-9b1f-200e2a9be499', 'exit_status': 'success', 'time': 1432995420.096022}


Notice, in the log, that the response to `wait` is the set of Movers the scan was waiting on.

### Wait for two motors to both be done moving

In [4]:
def wait_multiple(motors, det):
    "Set motors, trigger all motors, wait for all motors to move."
    for motor in motors:
        yield Msg('set', motor, {'pos': 5})
        yield Msg('trigger', motor, block_group='A')  # Trigger each motor and add it to group 'A'.
    yield Msg('wait', None, 'A')  # Wait for everything in group 'A' to report done.
    yield Msg('trigger', det)
    yield Msg('read', det)

motor1 = Mover('motor1', ['pos'])
motor2 = Mover('motor2', ['pos'])

RE.run(wait_multiple([motor1, motor2], det))

Emitted RunStart:
{'scan_id': 123, 'uid': '52de6e9d-c762-4197-a5c2-6bd203a0c0c1', 'owner': 'tester', 'time': 1432995426.953379, 'beamline_id': 'test'}
set: (mover: motor1), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor1), (), {'block_group': 'A'}
   ret: None
set: (mover: motor2), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor2), (), {'block_group': 'A'}
   ret: None
wait: (None), ('A',), {}
   ret: {mover: motor2, mover: motor1}
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'timetamp': 1432995427.771697, 'value': 3.7266531720786709e-06}}
Emitted RunStop:
{'reason': '', 'run_start': '52de6e9d-c762-4197-a5c2-6bd203a0c0c1', 'exit_status': 'success', 'time': 1432995427.976239}


### Advanced Example: Wait for different groups of motors at different points in the run

If the `'A'` bit seems pointless, the payoff is here. We trigger all the motors at once, wait for the first two, read, wait for the last one, and read again. This is merely meant to show that complex control flow is possible.

In [5]:
def wait_complex(motors, det):
    "Set motors, trigger motors, wait for all motors to move."
    # Same as above...
    for motor in motors[:-1]:
        yield Msg('set', motor, {'pos': 5})
        yield Msg('trigger', motor, block_group='A')
        
    # ...but put the last motor is separate group.
    yield Msg('set', motors[-1], {'pos': 5})
    yield Msg('trigger', motors[-1], block_group='B')
    
    yield Msg('wait', None, 'A')  # Wait for everything in group 'A' to report done.
    yield Msg('trigger', det)
    yield Msg('read', det)
    
    yield Msg('wait', None, 'B')  # Wait for everything in group 'B' to report done.
    yield Msg('trigger', det)
    yield Msg('read', det)
    
motor3 = Mover('motor3', ['pos'])

RE.run(wait_complex([motor1, motor2, motor3], det))

Emitted RunStart:
{'scan_id': 123, 'uid': 'c89d40aa-166b-473c-aa62-6cbb158350e3', 'owner': 'tester', 'time': 1432995430.894365, 'beamline_id': 'test'}
set: (mover: motor1), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor1), (), {'block_group': 'A'}
   ret: None
set: (mover: motor2), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor2), (), {'block_group': 'A'}
   ret: None
set: (mover: motor3), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor3), (), {'block_group': 'B'}
   ret: None
wait: (None), ('A',), {}
   ret: {mover: motor2, mover: motor1}
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'timetamp': 1432995432.017687, 'value': 3.7266531720786709e-06}}
wait: (None), ('B',), {}
   ret: {mover: motor3}
trigger: (reader: sg), (), {}
   ret: None
read: (reader: sg), (), {}
   ret: {'intensity': {'timetamp': 1432995432.324914, 'value': 3.7266531720786709e-06}}
Emitted RunStop:
{'reason': '', 'run_start': 'c89d40aa-166b-473c-aa62

## Runs can be aborted

### SIGINT (Ctrl+C) is reliably caught before each message is processed, even across threads.

The output below is truncated because it caught Ctrl+C (or, in the notebook, "Interrupt Kerenel").

In [17]:
RE.run(simple_scan(motor))

Emitted RunStart:
{'owner': 'tester', 'uid': '4df1da7d-f5ec-4d24-ac67-6312cc939b1a', 'scan_id': 123, 'time': 1432990069.884299, 'beamline_id': 'test'}
set: (mover: motor), ({'pos': 5},), {}
   response: None
trigger: (mover: motor), (), {}
   response: None


### Threading is optional -- switch it off for easier debugging

Again, we'll interrupt the scan. We get exactly the same result, but this time we see a full Traceback.

In [43]:
RE.run(simple_scan(motor), use_threading=False)

Emitted RunStart:
{'beamline_id': 'test', 'uid': '0877e421-a308-49c1-a489-876440d6fdb2', 'scan_id': 123, 'time': 1433000200.57927, 'owner': 'tester'}
set: (mover: motor), ({'pos': 5},), {}
   ret: None
Emitted RunStop:
{'exit_status': 'abort', 'run_start': '0877e421-a308-49c1-a489-876440d6fdb2', 'reason': '', 'time': 1433000200.768894}


RunInterrupt: RunEngine detected a SIGINT (Ctrl+C) and aborted the scan. Records were created, but the run was marked with exit_status='abort'.

## Data can be consumed live by user-defined functions

In the examples above, the runs have been emitting RunStart and RunStop Documents, but no Events or Event Descriptors. We will add those now.

### Emitting Events and Event Descriptors

The `'create'` and `'save'` commands collect all the reads between them into one Event.

If that particular set of objects has never been bundled into an Event during this run, then an Event Descriptor is also created.

All four Documents -- RunStart, RunStop, Event, and EventDescriptor -- are simply Python dictionaries.

In [30]:
def simple_scan_saving(motor):
    "Set, trigger, read"
    yield Msg('create')
    yield Msg('set', motor, {'pos': 5})
    yield Msg('trigger', motor)
    yield Msg('read', motor)
    yield Msg('save')
    
RE.run(simple_scan_saving(motor))

Emitted RunStart:
{'beamline_id': 'test', 'uid': '8c032f0d-a100-40aa-bd3b-3c3adbf659cc', 'scan_id': 123, 'time': 1432999560.570444, 'owner': 'tester'}
create: (None), (), {}
   ret: None
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
read: (mover: motor), (), {}
   ret: {'pos': {'timestamp': 1432999560.988787, 'value': 5}}
Emitted Event Descriptor:
{'uid': '564922b0-c1de-4426-b351-62ff081fb235', 'run_start': '8c032f0d-a100-40aa-bd3b-3c3adbf659cc', 'data_keys': {'pos': {'dtype': 'number', 'source': 'motor'}}, 'time': 1432999561.19379}
Emitted Event:
{'seq_num': 1, 'uid': 'c3984734-05b0-488f-bbc3-4bf4eee1cb2d', 'data': {'pos': {'timestamp': 1432999560.988787, 'value': 5}}, 'time': 1432999561.193961, 'descriptor': '564922b0-c1de-4426-b351-62ff081fb235'}
save: (None), (), {}
   ret: None
Emitted RunStop:
{'exit_status': 'success', 'run_start': '8c032f0d-a100-40aa-bd3b-3c3adbf659cc', 'reason': '', 'time': 1432999561.297982}


### Consuming Documents for Live Visualization and Analysis

Any user function that accepts a Python dictionary can be registered as a "consumer" of these Event Documents. Here's a toy example.

In [31]:
def print_event_time(doc):
    print('EVENT TIME:', doc['time'])

To use this consumer function during a run:

In [35]:
RE.run(simple_scan_saving(motor), subscriptions={'event': print_event_time})

Emitted RunStart:
{'beamline_id': 'test', 'uid': '812c95cc-34fa-461b-ae75-8b5382eed05e', 'scan_id': 123, 'time': 1432999567.681772, 'owner': 'tester'}
create: (None), (), {}
   ret: None
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
read: (mover: motor), (), {}
   ret: {'pos': {'timestamp': 1432999568.091286, 'value': 5}}
Emitted Event Descriptor:
{'uid': 'b3466a6d-fe04-440a-be9f-47c620ed8111', 'run_start': '812c95cc-34fa-461b-ae75-8b5382eed05e', 'data_keys': {'pos': {'dtype': 'number', 'source': 'motor'}}, 'time': 1432999568.301414}
Emitted Event:
{'seq_num': 1, 'uid': '822b802e-43de-4c7e-977d-5b93a8cf6e0c', 'data': {'pos': {'timestamp': 1432999568.091286, 'value': 5}}, 'time': 1432999568.301582, 'descriptor': 'b3466a6d-fe04-440a-be9f-47c620ed8111'}
save: (None), (), {}
   ret: None
EVENT TIME: 1432999568.301582
Emitted RunStop:
{'exit_status': 'success', 'run_start': '812c95cc-34fa-461b-ae75-8b5382eed05e', 'reason': '', 'time': 14329

The use it by default on every run for this instance of the Run Engine:

In [32]:
token = RE.subscribe('event', print_event_time)
token

1

The output `token`, an integer, can be use to unsubscribe later.

In [34]:
RE.unsubscribe(token)

Mission-critical consumers can be run on the scan thread, where they will block the scan until they return. This should not be used for computationally heavy tasks like visualization.

In [None]:
RE._register_scan_callback('event', critical_func)

### Saving Documents to Metadatastore

The convenience function `register_mds` registers metadatastore's four `insert_*` functions to consume their four respective documents. These are registered on the scan thread, so data is guaranteed to be saved in metadatastore.

In [36]:
%run register_mds.py

register_mds(RE)

We can verify that this worked by loading this one-point scan from the DataBroker and displaying the data using DataMuxer.

In [38]:
RE.run(simple_scan_saving(motor))

Emitted RunStart:
{'beamline_id': 'test', 'uid': 'ef0d61fb-b740-4986-98e9-e5f8949fc37e', 'scan_id': 123, 'time': 1432999913.124614, 'owner': 'tester'}
create: (None), (), {}
   ret: None
set: (mover: motor), ({'pos': 5},), {}
   ret: None
trigger: (mover: motor), (), {}
   ret: None
read: (mover: motor), (), {}
   ret: {'pos': {'timestamp': 1432999913.530245, 'value': 5}}
Emitted Event Descriptor:
{'uid': '13cd66e7-ceb6-43dc-88aa-c00768eaecf8', 'run_start': 'ef0d61fb-b740-4986-98e9-e5f8949fc37e', 'data_keys': {'pos': {'dtype': 'number', 'source': 'motor'}}, 'time': 1432999913.735501}
Emitted Event:
{'seq_num': 1, 'uid': '228c2c9c-5d8a-4395-9a67-21352c5d5aca', 'data': {'pos': {'timestamp': 1432999913.530245, 'value': 5}}, 'time': 1432999913.742596, 'descriptor': '13cd66e7-ceb6-43dc-88aa-c00768eaecf8'}
save: (None), (), {}
   ret: None
Emitted RunStop:
{'exit_status': 'success', 'run_start': 'ef0d61fb-b740-4986-98e9-e5f8949fc37e', 'reason': '', 'time': 1432999913.848467}


In [39]:
from dataportal import DataBroker as db

header = db[-1]
header

0,1
beamline_config,config_paramstime1432999913.124705time_as_datetime2015-05-30 11:31:53.124705 (27 seconds ago)uid218d8383-67fc-4efd-9db0-bbe36cf08bf2
beamline_id,test
event_descriptors,data_keysposdtypenumberexternalNoneshapesourcemotorrun_startef0d61fb-b740-4986-98e9-e5f8949fc37etime1432999913.735501time_as_datetime2015-05-30 11:31:53.735501 (26 seconds ago)uid13cd66e7-ceb6-43dc-88aa-c00768eaecf8
exit_reason,
exit_status,success
group,
owner,tester
project,
run_start_uid,ef0d61fb-b740-4986-98e9-e5f8949fc37e
run_stop_uid,2455cf05-1239-4e93-a470-e56d4818ad17

0,1
config_params,
time,1432999913.124705
time_as_datetime,2015-05-30 11:31:53.124705 (27 seconds ago)
uid,218d8383-67fc-4efd-9db0-bbe36cf08bf2

0
data_keysposdtypenumberexternalNoneshapesourcemotorrun_startef0d61fb-b740-4986-98e9-e5f8949fc37etime1432999913.735501time_as_datetime2015-05-30 11:31:53.735501 (26 seconds ago)uid13cd66e7-ceb6-43dc-88aa-c00768eaecf8

0,1
data_keys,posdtypenumberexternalNoneshapesourcemotor
run_start,ef0d61fb-b740-4986-98e9-e5f8949fc37e
time,1432999913.735501
time_as_datetime,2015-05-30 11:31:53.735501 (26 seconds ago)
uid,13cd66e7-ceb6-43dc-88aa-c00768eaecf8

0,1
pos,dtypenumberexternalNoneshapesourcemotor

0,1
dtype,number
external,
shape,
source,motor


In [42]:
from dataportal import DataMuxer as dm

dm.from_events(db.fetch_events(header)).to_sparse_dataframe()

Unnamed: 0,pos,time
0,5,2015-05-30 15:31:53.742596
