# Facility-Scale Deployment: Message Bus, MongoDB, intake server

## Scenario

This is an approximation of a deployment "at scale," such as at a large experimental facility.

Here, bluesky's **RunEngine** is configured to dispatch the documents generated during data acquisition to a (very lightweight) **message bus**. A **consumer** is listening to the message bus and inserting Documents into a **MongoDB** instance, which stores the data at rest.

RunEngine -> 0MQ Publisher -> 0MQ Proxy -> 0MQ Subscriber -> MongoDB

An **intake server** has access to the MongoDB and to any externally-stored files written directly to disk by large detectors. It serves all of this to users over HTTP.

In this demo deployment, everything happens to be on the same machine, but it need not be. The user will never directly connect to the MongoDB or directly access the filesystem where data files are stored.

## First: Check that services are running

The lightweight message bus (``bluesky-0MQ-proxy``), the consumer (``consumer.py``), a user-space MongoDB daemon (``run-mongobox.py``), and the ``intake-server`` should already been automatically started by supervisor. Confirm that they are running:

In [None]:
!supervisorctl -c supervisor/supervisord.conf status

In [None]:
%matplotlib notebook
from bluesky.utils import install_nb_kicker
install_nb_kicker()

from bluesky import RunEngine
from bluesky.plans import scan
from bluesky.preprocessors import SupplementalData
from ophyd.sim import motor, det
from bluesky.callbacks.zmq import Publisher
from bluesky.callbacks.mpl_plotting import LivePlot

RE = RunEngine({})  # executor for experiment procedures
publish_to_message_bus = Publisher('localhost:5577')
RE.subscribe(publish_to_message_bus);

## Acquire Data Using Bluesky

In [None]:
RE(scan([det], motor, -1, 1, 20))

The documents generated by the scan have to published in a streaming fashion and ultimately encoded as MongoDB Documents. The uid(s) returned by ``RE`` uniquely identify the data.

To get some live feedback, we can additional dispatch the documents into a live-streaming plot. Additionally, we'll capture that unique ID in a variable.

In [None]:
uid, = RE(scan([det], motor, -1, 1, 20), LivePlot('det', 'motor'))

## Access saved data using intake

### Connect to the intake server

In [None]:
from intake import Catalog
import intake_bluesky.mongo_layout1

catalog = Catalog('intake://localhost:5000')

### Find the 'run' of interest

In this case, we have its globally unique ID, so we can just look it up directly.

In [None]:
run = catalog['xyz'][uid]()

### For interactive analysis, get a PyData/Scipy structure (e.g. xarray)

Each Event Stream (logical table) in the acquired data is a subcatalog. The number and name of the streams depends on the application, but ``'primary'`` is commonly the one of interest.

In [None]:
catalog.primary.read()

### To rerun bluesky's streaming/viz tooling on saved data, get the original Documents

This reproduces the same stream of Documents that was originally emitted by ``RE``.

In [None]:
live_plot = LivePlot('det', 'motor')
for name, doc in catalog['xyz'][uid].read_canonical():
    live_plot(name, doc)