# Intake for Bluesky

## Setup: Acquire some sample data.

For data acquisition (but not for data access!) we assume that we have direct access to MongoDB (or some message queue that has a sink into MongoDB).

In [26]:
from bluesky import RunEngine
from intake_bluesky import MongoInsertCallback
from bluesky.plans import scan
from bluesky.preprocessors import SupplementalData
from ophyd.sim import motor, det, direct_img, img

RE = RunEngine({})
sd = SupplementalData(baseline=[motor])
RE.preprocessors.append(sd)

# This is just a simple callback that does MongoDB insert_one. No databroker.
metadatastore_uri = 'mongodb://localhost:27017/test1'
assets_uri = 'mongodb://localhost:27017/test1'
insert = MongoInsertCallback(metadatastore_uri, assets_uri)
RE.subscribe(insert)


uid, = RE(scan([det], motor, -1, 1, 20))
direct_img_uid, = RE(scan([direct_img], motor, -1, 1, 20))
img_uid, = RE(scan([img], motor, -1, 1, 20))

## Open an intake Catalog.

We could use intake to access the data _directly_ like this, though we will probably never do so at NSLS-II.

In [27]:
from intake_bluesky import MongoMetadataStoreCatalog

mds = MongoMetadataStoreCatalog(metadatastore_uri)
mds

<Intake catalog: mongodb://localhost:27017/test1>

In [29]:
mds[img_uid].primary.read()

NotImplementedError: 

In [4]:
mds[direct_img_uid].primary().read()

<xarray.Dataset>
Dimensions:                   (dim_0: 10, dim_1: 10, time: 20)
Coordinates:
  * time                      (time) float64 1.544e+09 1.544e+09 ... 1.544e+09
Dimensions without coordinates: dim_0, dim_1
Data variables:
    img                       (time, dim_0, dim_1) float64 1.0 1.0 ... 1.0 1.0
    motor                     (time) float64 -1.0 -0.8947 -0.7895 ... 0.8947 1.0
    motor_setpoint            (time) float64 -1.0 -0.8947 -0.7895 ... 0.8947 1.0
    img:img                   (time, dim_0, dim_1) float64 1.0 1.0 ... 1.0 1.0
    motor:motor_velocity      (time) int64 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1
    motor:motor_acceleration  (time) int64 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1
    seq_num                   (time) int64 1 2 3 4 5 6 7 ... 15 16 17 18 19 20

Instead we will access data through an HTTP service. We will start an intake server like this:

```
intake-server facility_catalog.yml
```

where `facility_catalog.yml` encodes the MongoDB ``uri`` above, and potentially many such URIs:

In [5]:
%cat facility_catalog.yml

plugins:
  source:
    - module: intake_bluesky
sources:
  xyz:
    description: Some imaginary beamline
    driver: mongo_metadatastore
    container: catalog
    args:
      uri: mongodb://localhost:27017/test1
    metadata:
      beamline: "00-ID"


In [6]:
import intake

facility_catalog = intake.Catalog("intake://localhost:5000", page_size=100)
facility_catalog

<Intake catalog: None>

A Catalog contains entries, which we can access by iteration:

```
for entry in catalog:
    ...
```

or individually by name:

```
entry = catalog[entry_name]
```

For small Catalogs, it is convenient to ``list`` their contents.

In [7]:
list(facility_catalog)

['xyz']

The ``facility_catalog`` contains a catalog for each beamline. Let's access the ``xyz`` entry, which is also a Catalog.

In [8]:
cat = facility_catalog['xyz']()
cat

<Intake catalog: xyz>

In [9]:
cat[uid]

<Catalog Entry: caad3b8d-6507-444e-9e4c-6ab3c2e00f77>

Each entry in this Catalog represents one scan. There are too many to list them all. (We could _try_ but it would take a long time and probably run out of memory.)

We can find scans of interest in a couple ways.

## Progressive Search

We can search ``cat`` by passing it a Mongo Query. The result is another Catalog, with a subset of the entries in ``cat``.

In [10]:
search_results = cat.search({'plan_name': 'scan'})
search_results

<Intake catalog: None>

We can progressively serach, generating yet another Catalog.

In [11]:
import time
recent_counts = search_results.search({'time': {'$gt': time.time() - 60 * 60 * 24}})
recent_counts

<Intake catalog: None>

Having narrowed the results to a small Catalog, we can list them.

In [12]:
list(recent_counts)

['59110e2b-52d1-4c52-9bff-53a56ed13d2d',
 'caad3b8d-6507-444e-9e4c-6ab3c2e00f77',
 'e92cc2d5-d437-435c-a480-385327c525f4',
 '14aadf7f-99cc-415a-a1a5-1b0607830b5c',
 '7d48ccaa-39c8-4371-9808-140c4e6fbfb7',
 'aaa458f8-b21b-468a-abcb-15957aef2693',
 'cdc3861c-8e44-4901-8839-78e38058bc2b',
 'd84eb3b1-b2c4-48b5-9d6c-ee75ea91a347',
 '3d052dc2-608a-415e-ac59-7b520a2366cd',
 '0237a0fa-253e-4eaf-ba03-c44ce3364384',
 '1b60fe9e-d297-49bf-aaab-1201dbe6f610',
 'd14e4dae-ba47-41b6-b823-8d3bb42a8934',
 '9affeed8-827f-443a-953e-8ebbb6d73568',
 '90663b52-85f1-4518-be95-73407035b339',
 '1736582e-b9c6-400b-8b37-ed5de75096b3',
 '242cf823-917c-42f9-8803-fd57a72b2405',
 '37f2c897-4cd9-4189-8b08-8b10efae6b42',
 '26e019e4-89eb-4ca9-a26c-d00c2b005605',
 'd9b965cd-e5e4-4595-beaa-c7027d70fef5',
 '7fd486c7-c4a1-4a88-8fb0-65db7ae7b200',
 'ffd88a74-b44f-46ed-9a24-b3691cb65471',
 '73d3f963-c796-4f84-abfe-630e68d4b185',
 '8e57a261-e569-4977-a524-22a5573b2b89',
 '87344118-b30f-4456-b71a-561aa471cebf',
 '4c52d958-885f-

## Random access by unique ID (`uid`), recency, and `scan_id`

We can access entries by their unique ID "name" as in:

In [13]:
entry = cat[uid]  # uid we captured above during data acquisition
entry

<Catalog Entry: caad3b8d-6507-444e-9e4c-6ab3c2e00f77>

We can also access entries by *recency* with this syntactic sugar:

In [14]:
recent_counts[-1]

<Catalog Entry: -1>

A positive integer matches the most recent entry with the corresponding ``scan_id`` (not necessarily globally unique!)

In [15]:
cat[3]

<Catalog Entry: 3>

Both of these "tricks" are _not_ general features of intake Catalogs, but as shown we can support them, for the sake of convenience and of continuity with databroker usage patterns.

## Metadata

The entry's metadata is available via ``entry.metadata``. Notice that this includes ``entry.metadata.start`` and ``entry.metadata.stop``, the documents generated at the beginning and end of the corresponding scan.

In [16]:
entry.metadata

{'start': {'uid': 'caad3b8d-6507-444e-9e4c-6ab3c2e00f77',
  'time': 1544027929.8677351,
  'scan_id': 1,
  'plan_type': 'generator',
  'plan_name': 'scan',
  'detectors': ['det'],
  'motors': ['motor'],
  'num_points': 20,
  'num_intervals': 19,
  'plan_args': {'detectors': ["SynGauss(name='det', value=1.0, timestamp=1544027929.848608)"],
   'num': 20,
   'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])",
    -1,
    1],
   'per_step': 'None'},
  'hints': {'dimensions': [[['motor'], 'primary']]},
  'plan_pattern': 'inner_product',
  'plan_pattern_module': 'bluesky.plan_patterns',
  'plan_pattern_args': {'num': 20,
   'args': ["SynAxis(prefix='', name='motor', read_attrs=['readback', 'setpoint'], configuration_attrs=['velocity', 'acceleration'])",
    -1,
    1]}},
 'stop': {'run_start': 'caad3b8d-6507-444e-9e4c-6ab3c2e00f77',
  'time': 1544027929.938302,
  'uid': 'b2adced4-5b33-4c2c-a5f6-2083d07f7be2',
  'e

## Accessing Data

Calling an Entry like `entry()` or equivalently `entry.get()` returns the DataSource for that Entry. The DataSource corresponding to one scan is itself a Catalog, named for with the `uid`.

In [17]:
entry()

<Intake catalog: caad3b8d-6507-444e-9e4c-6ab3c2e00f77>

That Catalog has one entry for each stream of data captured during that scan. (Typically there is a ``'primary'`` stream and potentially others, but this is just a convention.) As with all Catalogs, we can look at its contents.

In [18]:
list(entry())

['baseline', 'primary']

We can pull the data from the 'primary' stream all at once:

In [19]:
entry().primary().read()

<xarray.Dataset>
Dimensions:                   (time: 20)
Coordinates:
  * time                      (time) float64 1.544e+09 1.544e+09 ... 1.544e+09
Data variables:
    det                       (time) float64 0.6065 0.6701 ... 0.6701 0.6065
    det:det                   (time) float64 0.6065 0.6065 ... 0.6065 0.6065
    motor                     (time) float64 -1.0 -0.8947 -0.7895 ... 0.8947 1.0
    motor:motor_acceleration  (time) int64 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1
    motor:motor_velocity      (time) int64 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1
    motor_setpoint            (time) float64 -1.0 -0.8947 -0.7895 ... 0.8947 1.0
    seq_num                   (time) int64 1 2 3 4 5 6 7 ... 15 16 17 18 19 20

At this point, we have "left" intake. We have an ordinary `xarray.Dataset` object, which we can use to do any further slicing or drilling down. This `Dataset` contains numpy arrays. Alternatively, we can ask intake for a `Dataset` of _dask_ arrays, which will defer pulling the data from the server until called up to compute a result.

In [20]:
entry().primary().to_dask()  # an xarray of dask.arrays

<xarray.Dataset>
Dimensions:                   (time: 20)
Coordinates:
  * time                      (time) float64 1.544e+09 1.544e+09 ... 1.544e+09
Data variables:
    det                       (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    det:det                   (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    motor                     (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    motor:motor_acceleration  (time) int64 dask.array<shape=(20,), chunksize=(20,)>
    motor:motor_velocity      (time) int64 dask.array<shape=(20,), chunksize=(20,)>
    motor_setpoint            (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    seq_num                   (time) int64 dask.array<shape=(20,), chunksize=(20,)>

For example, converting the `xarray.Dataset` to a `pandas.DataFrame` will prompt dask to materialize the data:

In [21]:
entry().primary().to_dask().to_dataframe()

Unnamed: 0_level_0,det,det:det,motor,motor:motor_acceleration,motor:motor_velocity,motor_setpoint,seq_num
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1544028000.0,0.606531,0.606531,-1.0,1,1,-1.0,1
1544028000.0,0.670134,0.606531,-0.894737,1,1,-0.894737,2
1544028000.0,0.732249,0.606531,-0.789474,1,1,-0.789474,3
1544028000.0,0.791305,0.606531,-0.684211,1,1,-0.684211,4
1544028000.0,0.8457,0.606531,-0.578947,1,1,-0.578947,5
1544028000.0,0.893876,0.606531,-0.473684,1,1,-0.473684,6
1544028000.0,0.934385,0.606531,-0.368421,1,1,-0.368421,7
1544028000.0,0.965967,0.606531,-0.263158,1,1,-0.263158,8
1544028000.0,0.987612,0.606531,-0.157895,1,1,-0.157895,9
1544028000.0,0.998616,0.606531,-0.052632,1,1,-0.052632,10


We can look at the data from the other stream, 'baseline'.

In [23]:
entry().baseline().read()

<xarray.Dataset>
Dimensions:                   (time: 2)
Coordinates:
  * time                      (time) float64 1.544e+09 1.544e+09
Data variables:
    motor                     (time) float64 0.0 1.0
    motor:motor_acceleration  (time) int64 1 1
    motor:motor_velocity      (time) int64 1 1
    motor_setpoint            (time) float64 0.0 1.0
    seq_num                   (time) int64 1 2

Or merge all the stream together into one `xarray.Dataset`:

In [24]:
import xarray

xarray.merge(entry()[key].read() for key in entry())

<xarray.Dataset>
Dimensions:                   (time: 22)
Coordinates:
  * time                      (time) float64 1.544e+09 1.544e+09 ... 1.544e+09
Data variables:
    motor                     (time) float64 0.0 -1.0 -0.8947 ... 0.8947 1.0 1.0
    motor:motor_acceleration  (time) float64 1.0 1.0 1.0 1.0 ... 1.0 1.0 1.0 1.0
    motor:motor_velocity      (time) float64 1.0 1.0 1.0 1.0 ... 1.0 1.0 1.0 1.0
    motor_setpoint            (time) float64 0.0 -1.0 -0.8947 ... 0.8947 1.0 1.0
    seq_num                   (time) float64 1.0 1.0 2.0 3.0 ... 19.0 20.0 2.0
    det                       (time) float64 nan 0.6065 0.6701 ... 0.6065 nan
    det:det                   (time) float64 nan 0.6065 0.6065 ... 0.6065 nan

which creates a "block matrix" sorted on time, clearly visible when cast into a DataFrame:

In [25]:
xarray.merge(entry()[key].read() for key in entry()).to_dataframe()

Unnamed: 0_level_0,motor,motor:motor_acceleration,motor:motor_velocity,motor_setpoint,seq_num,det,det:det
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1544027000.0,0.0,1.0,1.0,0.0,1.0,,
1544027000.0,-1.0,1.0,1.0,-1.0,1.0,0.606531,0.606531
1544027000.0,-0.894737,1.0,1.0,-0.894737,2.0,0.670134,0.606531
1544027000.0,-0.789474,1.0,1.0,-0.789474,3.0,0.732249,0.606531
1544027000.0,-0.684211,1.0,1.0,-0.684211,4.0,0.791305,0.606531
1544027000.0,-0.578947,1.0,1.0,-0.578947,5.0,0.8457,0.606531
1544027000.0,-0.473684,1.0,1.0,-0.473684,6.0,0.893876,0.606531
1544027000.0,-0.368421,1.0,1.0,-0.368421,7.0,0.934385,0.606531
1544027000.0,-0.263158,1.0,1.0,-0.263158,8.0,0.965967,0.606531
1544027000.0,-0.157895,1.0,1.0,-0.157895,9.0,0.987612,0.606531


## N-dimensional Data (e.g. images)

Higher-dimensional data does not have to be treated specially. It can sit in an `xarray.Dataset` as well. As above, we can use `read()` to fetch the data immediately or `to_dask()` to fetch it lazily.

In [26]:
entry = cat[direct_img_uid]  # uid captured during data acquisition above
dataset = entry().primary().read()
dataset

<xarray.Dataset>
Dimensions:                   (dim_0: 10, dim_1: 10, time: 20)
Coordinates:
  * time                      (time) float64 1.544e+09 1.544e+09 ... 1.544e+09
Dimensions without coordinates: dim_0, dim_1
Data variables:
    img                       (time, dim_0, dim_1) float64 1.0 1.0 ... 1.0 1.0
    img:img                   (time, dim_0, dim_1) float64 1.0 1.0 ... 1.0 1.0
    motor                     (time) float64 -1.0 -0.8947 -0.7895 ... 0.8947 1.0
    motor:motor_acceleration  (time) int64 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1
    motor:motor_velocity      (time) int64 1 1 1 1 1 1 1 1 1 ... 1 1 1 1 1 1 1 1
    motor_setpoint            (time) float64 -1.0 -0.8947 -0.7895 ... 0.8947 1.0
    seq_num                   (time) int64 1 2 3 4 5 6 7 ... 15 16 17 18 19 20

## The `xarray.Dataset` is a very useful container.

It has a nice string representation, as shown above. We can access specific dimensions:

In [27]:
dataset['img']

<xarray.DataArray 'img' (time: 20, dim_0: 10, dim_1: 10)>
array([[[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       ...,

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]]])
Coordinates:
  * time     (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Dimensions without coordinates: dim_0, dim_1

Do math along named dimensions:

In [28]:
dataset['img'].sum('time')

<xarray.DataArray 'img' (dim_0: 10, dim_1: 10)>
array([[20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.]])
Dimensions without coordinates: dim_0, dim_1

Slice along named dimensions:

In [29]:
dataset['img'].sel(dim_0=slice(0, 3), dim_1=slice(5, 10))

<xarray.DataArray 'img' (time: 20, dim_0: 3, dim_1: 5)>
array([[[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       ...,

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]]])
Coordinates:
  * time     (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Dimensions without coordinates: dim_0, dim_1