# "We'll Cross the Streams": Combining Asynchronous Data Streams

![Ghostbusters reference](https://media.giphy.com/media/3o72EWUgbRNfLegO1W/giphy.gif)

![Ghostbusters reference](https://thumbs.gfycat.com/ThirstyEnchantedCaudata-size_restricted.gif)

## Configuration

This code would normally go in a script automatically run at startup. The user would not have to worry about this.

In [None]:
%run ../beamline_configuration.py

## Monitor current asynchronously while counting the detector.

Monitoring can be done on a one-off basis, but it's typically set up in a semi-permanent way: you want to "set it and forget it." The ``sd`` object keeps track of things to monitor concurrently with other measurements. We'll add the beam current signal ``I`` to that list.

In [None]:
sd

In [None]:
sd.monitors.append(I)

Now while we scan a motor and read a detector, ``I`` will be monitored in the background.

In [None]:
RE(scan([slit], motor_slit, -15, 15, 150))

Get the data as before. The dataset is large, so we'll use the `.head()` method to show just the first several rows.

In [None]:
header = db[-1]
header.table().head()  # shows the 'primary' stream by default

In [None]:
header.table(stream_name='primary').head()  # equivalent to the above

What other streams are there?

In [None]:
header.stream_names

In [None]:
header.table('I_monitor').head()

### Plot data stream together

We can plot them each against time.

In [None]:
plt.figure()
plt.plot('time', 'slit_det', data=header.table(), marker='o', label='slit_det')
plt.plot('time', 'I', data=header.table(stream_name='I_monitor'), marker='x', label='I')
plt.legend()

We cannot plot ``slit_det`` vs ``I`` or normalize ``slit_det`` by ``I`` because the two were never measured at exactly the same time. We'll have to interpolate, downsample, or some combination to get the two measurements into one unified time series.

## "Muxing" (combining into one time series) the streams

To start, we'll use a pandas function for concatenating the tables side by side. The result is a sort of "block matrix" of missing data (NaN).

In [None]:
import pandas as pd
data = pd.concat([header.table('primary'), header.table('I_monitor')], axis=0)
data

In [None]:
# Make 'time' the index and sort on it.
sorted_data = data.set_index('time').sort_index()
sorted_data.head(20)

### Crude but conceptually simple: interpolate everything with "forward-fill"

In [None]:
ffilled_data = sorted_data.ffill()
ffilled_data.head(20)

In [None]:
ffilled_data['normalized'] = ffilled_data['slit_det'] / ffilled_data['I'] * ffilled_data['I'].mean()

In [None]:
plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='interpolated and normalized')
plt.legend()

### More accurate: use linear interpolation instead of "forward-fill"

In [None]:
interp_data = sorted_data.interpolate('linear')
interp_data['normalized'] = interp_data['slit_det'] / interp_data['I'] * interp_data['I'].mean()

In [None]:
plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')
plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')
plt.legend()

### More accurate: down-sample current

In [None]:
sm_I = header.table('I_monitor').set_index('time')['I'].rolling(window=3).mean()
sm_data = pd.concat([header.table('primary').set_index('time'), pd.DataFrame({'I': sm_I})], axis=0)
sorted_sm_data = sm_data.sort_index()
interp_sm_data = sorted_sm_data.interpolate('linear')
interp_sm_data['normalized'] = interp_sm_data['slit_det'] / interp_sm_data['I'] * interp_sm_data['I'].mean()

plt.figure()
plt.plot('motor_slit', 'slit_det', data=ffilled_data, label='raw')
plt.plot('motor_slit', 'normalized', data=ffilled_data, label='ffill')
plt.plot('motor_slit', 'normalized', data=interp_data, label='linear')
plt.plot('motor_slit', 'normalized', data=interp_sm_data, label='downsampled + linear')
plt.legend()