In [1]:
from databroker import db, get_table

from metadatastore.commands import (insert_run_start, find_run_starts,
                                    insert_descriptor, find_descriptors,
                                    insert_event, find_events,
                                    insert_run_stop, find_run_stops,
                                    descriptors_by_start, get_events_generator)

from ixstools.io import Specfile
import uuid
import os
from metadatastore.utils.testing import mds_setup, mds_teardown

In [2]:
mds_setup()

In [3]:
specpath = '/home/edill/dev/python/ixstools/ixstools/'
sf = Specfile(specpath + os.sep + '20160219.spec')

In [4]:
def run_start(specscan, beamline_id, **md):
    run_start_dict = {
        'time': specscan.time_from_date.timestamp(),
        'scan_id': specscan.scan_id,
        'beamline_id': beamline_id,
        'uid': str(uuid.uuid4()),
        'specpath': specscan.specfile.filename,
        'owner': specscan.specfile.parsed_header['user'],
        'plan_args': specscan.scan_args,
        'scan_type': specscan.scan_command,
    }
    run_start_dict.update(**md)
    return insert_run_start(**run_start_dict)

def baseline(specscan, start_uid):
    timestamp = specscan.time_from_date.timestamp()
    data_keys = {}
    data = {}
    timestamps = {}
    for obj_name, human_name, value in zip(specscan.specfile.parsed_header['motor_spec_names'], 
                                           specscan.specfile.parsed_header['motor_human_names'], 
                                           specscan.motor_values):
        data_keys[obj_name] = {'dtype': 'number', 'shape': [], 'source': human_name}
        data[obj_name] = value
        timestamps[obj_name] = timestamp
    data_keys.update({k: {'dtype': 'number', 'shape': [], 'source': k} for k in 'hkl'})
    data.update({k: v for k, v in zip('hkl', specscan.hkl)})
    timestamps.update({k: timestamp for k in 'hkl'})
    descriptor_uid = insert_descriptor(run_start=start_uid, data_keys=data_keys, 
                                       time=timestamp, uid=str(uuid.uuid4()), name='baseline')
    yield 'descriptor', descriptor_uid
    yield 'event', insert_event(descriptor=descriptor_uid, seq_num=0, time=timestamp,
                                data=data, timestamps=timestamps, uid=str(uuid.uuid4()))

def events(specscan, start_uid):
    timestamp = specscan.time_from_date.timestamp()
    data_keys = {}
    data = {}
    timestamps = {}
    
    data_keys = {col: {'dtype': 'number', 'shape': [], 'source': col} for col in specscan.col_names}
    descriptor_uid = insert_descriptor(run_start=start_uid, data_keys=data_keys,
                                       time=timestamp, uid=str(uuid.uuid4()),
                                       name='primary')
    yield 'descriptor', descriptor_uid
    timestamps = {col: timestamp for col in specscan.col_names}
    for seq_num, (x, row_series) in enumerate(specscan.scan_data.iterrows()):
        data = {col: data for col, data in zip(row_series.index, row_series[:])}
        yield 'event', insert_event(data=data, descriptor=descriptor_uid, seq_num=seq_num,
                                    time=timestamp + data['Epoch'],
                                    timestamps=timestamps, uid=str(uuid.uuid4()))

def stop(specscan, start_uid, **md):
    timestamp = specscan.time_from_date.timestamp()
    yield 'stop', insert_run_stop(run_start=start_uid, time=timestamp, uid=str(uuid.uuid4()), **md)

In [None]:
def to_document_stream(specscan, beamline_id):
    start_uid = run_start(specscan, beamline_id)
    yield 'start', start_uid
    yield from baseline(specscan, start_uid)
    yield from events(specscan, start_uid)
    # do some sanity checks
    descriptors = descriptors_by_start(start_uid)
    assert len(descriptors) == 2
    baseline_descriptor, primary_descriptor = descriptors
    if baseline_descriptor.name != 'baseline':
        baseline_descriptor, primary_descriptor = primary_descriptor, baseline_descriptor
    baseline_descriptor = descriptors[0] if descriptors[0].name == 'baseline' else descriptors[1]
    baseline_events = list(get_events_generator(baseline_descriptor))
    assert len(baseline_events) == 1
    primary_descriptor = descriptors[0] if descriptors[0].name == 'primary' else descriptors[1]
    primary_events = list(get_events_generator(primary_descriptor))
    if len(primary_events) <= specscan.num_points:
        print('scan %s only has %s/%s points. Assuming scan was aborted. start_uid=%s' 
              % (specscan.scan_id, len(primary_events), specscan.num_points, start_uid))
        reason = 'abort'
    else:
        reason = 'success'
    yield from stop(specscan, start_uid, reason=reason)
    

In [None]:
for specscan in sf:
    stream = list(to_document_stream(specscan, 'ixs'))

scan 1 only has 12/34 points. Assuming scan was aborted. start_uid=d87b6ccf-d45e-485f-b933-3caedc95438e
scan 2 only has 31/34 points. Assuming scan was aborted. start_uid=863e687f-4b56-4dcf-a6d2-6af695493e15
scan 14 only has 18/34 points. Assuming scan was aborted. start_uid=c7a01787-97ee-4cba-a1dc-0f06908e7e06
scan 15 only has 15/34 points. Assuming scan was aborted. start_uid=6e1d36b2-dfa9-4c1a-a575-4d54bbb4f4fd

In [None]:
headers = db(specpath='/home/edill/dev/python/ixstools/ixstools/20160219.spec')

In [None]:
assert len(headers) == len(sf)

In [None]:
len(sf)

In [None]:
len(headers)

In [None]:
# show the contents of the document stream
stream

In [None]:
db[stream[0][1]].start

In [None]:
print(db[stream[0][1]].descriptors[0])

In [None]:
print(db[stream[0][1]].descriptors[1])

In [None]:
print(db[stream[0][1]].stop)

In [None]:
mds_teardown()