# Intake for Bluesky

## Setup: Acquire some sample data.

For data acquisition (but not for data access!) we assume that we have direct access to MongoDB (or some message queue that has a sink into MongoDB).

In [1]:
from bluesky import RunEngine
from intake_bluesky import MongoInsertCallback
from bluesky.plans import scan
from bluesky.preprocessors import SupplementalData
from ophyd.sim import motor, det, direct_img, img

RE = RunEngine({})
sd = SupplementalData(baseline=[motor])
RE.preprocessors.append(sd)

# This is just a simple callback that does MongoDB insert_one. No databroker.
metadatastore_uri = 'mongodb://localhost:27017/test1'
assets_uri = 'mongodb://localhost:27017/test1'
insert = MongoInsertCallback(metadatastore_uri, assets_uri)
RE.subscribe(insert)


uid, = RE(scan([det], motor, -1, 1, 20))
direct_img_uid, = RE(scan([direct_img], motor, -1, 1, 20))

## Open an intake Catalog.

We could use intake to access the data _directly_ like this, though we will probably never do so at NSLS-II.

In [2]:
from intake_bluesky import MongoMetadataStoreCatalog

mds = MongoMetadataStoreCatalog(metadatastore_uri)
mds

<Intake catalog: mongodb://localhost:27017/test1>

Instead we will access data through an HTTP service. We will start an intake server like this:

```
intake-server facility_catalog.yml
```

where `facility_catalog.yml` encodes the MongoDB ``uri`` above, and potentially many such URIs:

In [3]:
%cat facility_catalog.yml

plugins:
  source:
    - module: intake_bluesky
sources:
  xyz:
    description: Some imaginary beamline
    driver: mongo_metadatastore
    container: catalog
    args:
      uri: mongodb://localhost:27017/test1
    metadata:
      beamline: "00-ID"


In [4]:
import intake

facility_catalog = intake.Catalog("intake://localhost:5000", page_size=100)
facility_catalog

<Intake catalog: None>

A Catalog contains entries, which we can access by iteration:

```
for entry in catalog:
    ...
```

or individually by name:

```
entry = catalog[entry_name]
```

For small Catalogs, it is convenient to ``list`` their contents.

In [5]:
list(facility_catalog)

['xyz']

The ``facility_catalog`` contains a catalog for each beamline. Let's access the ``xyz`` entry, which is also a Catalog.

In [6]:
cat = facility_catalog['xyz']()
cat

<Intake catalog: xyz>

Each entry in this Catalog represents one scan. There are too many to list them all. (We could _try_ but it would take a long time and probably run out of memory.)

We can find scans of interest in a couple ways.

## Progressive Search

We can search ``cat`` by passing it a Mongo Query. The result is another Catalog, with a subset of the entries in ``cat``.

In [7]:
search_results = cat.search({'plan_name': 'scan'})
search_results

<Intake catalog: None>

We can progressively serach, generating yet another Catalog.

In [8]:
import time
recent_counts = search_results.search({'time': {'$gt': time.time() - 60 * 60 * 24}})
recent_counts

<Intake catalog: None>

Having narrowed the results to a small Catalog, we can list them.

In [9]:
list(recent_counts)

['d6f9d017-da67-4e83-b90f-f9fe519316b2',
 'bd588760-0430-4419-b5f0-d013a7f52106',
 'c7769cf3-74d7-4a7a-aa6a-365c6a10f152',
 '32a7709b-35e8-42e2-b523-0de5f732af31',
 'cf64f61e-5568-4927-8134-31b4843fbed3',
 '826a95aa-47cc-4c1e-bc81-31f12ebbade5',
 '9760b89d-cd94-4211-98c8-be481d8633ac',
 '3dc9226a-54f3-49f3-9257-0d9d401175e9',
 'd1b2ebbc-c528-4f6b-bbd5-4d977eb0d86b',
 '2495c698-8967-430a-b806-ab9393f2a18a',
 'b282b431-f25a-4297-a4f6-c1ed27a420ba',
 '07357e9e-d40b-409f-adae-be471538f53b',
 'ea5b4a16-c22b-4230-8250-5b2ac1c30107',
 'f24fea2b-6994-4529-9c80-3696057a58ef',
 '2e92dd1b-958e-4924-8089-091502a3b2bc',
 '2b8ef77a-dec6-4b8f-8d3e-0be76d73b0b3',
 '0fd0d4ad-d87a-4611-a3d5-5b95b8d3df9c',
 '88247e6f-bf90-4aca-a204-dfb8ad6f1600',
 '026300f2-b001-4af1-b95b-23c8a0923a52',
 '5ab10c09-c2b8-4723-a716-f5d4327934dd',
 '9e3e75c0-b470-40a6-96b4-72080dcf7cbe',
 '97eb9101-ae36-4ef6-b5e8-460421cbe548',
 'be9c60c0-597a-4e56-994e-617253dc1e95',
 'eb3ddeb8-9db7-4bd3-9f27-57176a72e409']

## Random access by unique ID (`uid`), recency, and `scan_id`

We can access entries by their unique ID "name" as in:

In [10]:
entry = cat[uid]  # uid we captured above during data acquisition
entry

<Catalog Entry: bd588760-0430-4419-b5f0-d013a7f52106>

We can also access entries by *recency* with this syntactic sugar:

In [11]:
recent_counts[-1]

<Catalog Entry: -1>

A positive integer matches the most recent entry with the corresponding ``scan_id`` (not necessarily globally unique!)

In [12]:
cat[3]

<Catalog Entry: 3>

## Metadata

The entry's metadata is available via ``entry.metadata``. Notice that this includes ``entry.metadata.start`` and ``entry.metadata.stop``, the documents generated at the beginning and end of the corresponding scan.

In [13]:
entry.metadata

{}

### Accessing Data

The Entry corresponding to one scan is itself a Catalog, named for with the `uid`.

In [14]:
entry()

<Intake catalog: bd588760-0430-4419-b5f0-d013a7f52106>

It has an entry for each stream of data captured during that scan. Typically there is a ``'primary'`` stream and potentially others, but this is just a convention.

In [15]:
list(entry())

['baseline', 'primary']

We can pull the data from the 'primary' stream all at once:

In [16]:
entry().primary().read()

<xarray.Dataset>
Dimensions:         (time: 20)
Coordinates:
  * time            (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Data variables:
    det             (time) float64 0.6065 0.6701 0.7322 ... 0.7322 0.6701 0.6065
    motor           (time) float64 -1.0 -0.8947 -0.7895 ... 0.7895 0.8947 1.0
    motor_setpoint  (time) float64 -1.0 -0.8947 -0.7895 ... 0.7895 0.8947 1.0
    seq_num         (time) int64 1 2 3 4 5 6 7 8 9 ... 13 14 15 16 17 18 19 20

Or lazily, using dask:

In [17]:
entry().primary().to_dask()  # an xarray of dask.arrays

<xarray.Dataset>
Dimensions:         (time: 20)
Coordinates:
  * time            (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Data variables:
    det             (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    motor           (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    motor_setpoint  (time) float64 dask.array<shape=(20,), chunksize=(20,)>
    seq_num         (time) int64 dask.array<shape=(20,), chunksize=(20,)>

The above is quite clever. It will use dask to make calls to the server to pull the data when required --- for example, if we convert the data to a ``pandas.DataFrame``.

In [18]:
entry().primary().to_dask().to_dataframe()

Unnamed: 0_level_0,det,motor,motor_setpoint,seq_num
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1543980000.0,0.606531,-1.0,-1.0,1
1543980000.0,0.670134,-0.894737,-0.894737,2
1543980000.0,0.732249,-0.789474,-0.789474,3
1543980000.0,0.791305,-0.684211,-0.684211,4
1543980000.0,0.8457,-0.578947,-0.578947,5
1543980000.0,0.893876,-0.473684,-0.473684,6
1543980000.0,0.934385,-0.368421,-0.368421,7
1543980000.0,0.965967,-0.263158,-0.263158,8
1543980000.0,0.987612,-0.157895,-0.157895,9
1543980000.0,0.998616,-0.052632,-0.052632,10


We can look at the data from the other stream, 'baseline'.

In [19]:
entry().baseline().read()

<xarray.Dataset>
Dimensions:         (time: 2)
Coordinates:
  * time            (time) float64 1.544e+09 1.544e+09
Data variables:
    motor           (time) float64 0.0 1.0
    motor_setpoint  (time) float64 0.0 1.0
    seq_num         (time) int64 1 2

Or merge all the stream together into one `xarray.Dataset`:

In [20]:
import xarray

xarray.merge(entry()[key].read() for key in entry())

<xarray.Dataset>
Dimensions:         (time: 22)
Coordinates:
  * time            (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Data variables:
    motor           (time) float64 0.0 -1.0 -0.8947 -0.7895 ... 0.8947 1.0 1.0
    motor_setpoint  (time) float64 0.0 -1.0 -0.8947 -0.7895 ... 0.8947 1.0 1.0
    seq_num         (time) float64 1.0 1.0 2.0 3.0 4.0 ... 18.0 19.0 20.0 2.0
    det             (time) float64 nan 0.6065 0.6701 ... 0.6701 0.6065 nan

which creates a "block matrix" sorted on time, clearly visible when cast into a DataFrame:

In [21]:
xarray.merge(entry()[key].read() for key in entry()).to_dataframe()

Unnamed: 0_level_0,motor,motor_setpoint,seq_num,det
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1543980000.0,0.0,0.0,1.0,
1543980000.0,-1.0,-1.0,1.0,0.606531
1543980000.0,-0.894737,-0.894737,2.0,0.670134
1543980000.0,-0.789474,-0.789474,3.0,0.732249
1543980000.0,-0.684211,-0.684211,4.0,0.791305
1543980000.0,-0.578947,-0.578947,5.0,0.8457
1543980000.0,-0.473684,-0.473684,6.0,0.893876
1543980000.0,-0.368421,-0.368421,7.0,0.934385
1543980000.0,-0.263158,-0.263158,8.0,0.965967
1543980000.0,-0.157895,-0.157895,9.0,0.987612


## N-dimensional Data (e.g. images)

Image data can sit in an `xarray.Dataset` alongside other data. The `to_dask()` method allows us to fetch it lazily if desired.

In [22]:
entry = cat[direct_img_uid]  # uid captured during data acquisition above
dataset = entry().primary().read()
dataset

<xarray.Dataset>
Dimensions:         (dim_0: 10, dim_1: 10, time: 20)
Coordinates:
  * time            (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Dimensions without coordinates: dim_0, dim_1
Data variables:
    img             (time, dim_0, dim_1) float64 1.0 1.0 1.0 1.0 ... 1.0 1.0 1.0
    motor           (time) float64 -1.0 -0.8947 -0.7895 ... 0.7895 0.8947 1.0
    motor_setpoint  (time) float64 -1.0 -0.8947 -0.7895 ... 0.7895 0.8947 1.0
    seq_num         (time) int64 1 2 3 4 5 6 7 8 9 ... 13 14 15 16 17 18 19 20

In [23]:
dataset['img']

<xarray.DataArray 'img' (time: 20, dim_0: 10, dim_1: 10)>
array([[[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       ...,

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        ...,
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]]])
Coordinates:
  * time     (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Dimensions without coordinates: dim_0, dim_1

Do math along named dimensions:

In [24]:
dataset['img'].sum('time')

<xarray.DataArray 'img' (dim_0: 10, dim_1: 10)>
array([[20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.],
       [20., 20., 20., 20., 20., 20., 20., 20., 20., 20.]])
Dimensions without coordinates: dim_0, dim_1

Slice along named dimensions:

In [25]:
dataset['img'].sel(dim_0=slice(0, 3), dim_1=slice(5, 10))

<xarray.DataArray 'img' (time: 20, dim_0: 3, dim_1: 5)>
array([[[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       ...,

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]],

       [[1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.],
        [1., 1., ..., 1., 1.]]])
Coordinates:
  * time     (time) float64 1.544e+09 1.544e+09 ... 1.544e+09 1.544e+09
Dimensions without coordinates: dim_0, dim_1