In [1]:
import itertools
import uuid
import time
from collections import OrderedDict
from contextlib import contextmanager

from filestore.api import (bulk_insert_datum, insert_resource)
from metadataclient.mds import MDS
from portable_mds.mongoquery.mds import MDS as MQMDS
from portable_mds.sqlite.mds import MDS as SQMDS
from portable_mds.hdf5.mds import MDS as H5MDS

In [2]:
local_cfg = {'directory': '/Users/arkilic/local_mds_files'}
online_cfg = {'host': 'localhost',
             'port': 7778}
mds = MDS(online_cfg)
mqmds = MQMDS(local_cfg)
sqmds = SQMDS(local_cfg)
h5mds = H5MDS(local_cfg)

In [3]:
class ctxtime(object):
    def __enter__(self):
        self.t0 = time.time()
        return self

    def __exit__(self, type, value, traceback):
        self.elapsed = time.time() - self.t0


def new_uid():
    "uuid4 as a string"
    return str(uuid.uuid4())

In [4]:
class FSBase:
    def generate_datum(self, key, timestamp):
        "Generate a uid and cache it with its key for later insertion."
        # if self._locked_key_list:
        #     if key not in self._datum_uids:
        #         raise RuntimeError("modifying after lock")
        uid = new_uid()
        reading = {'value': uid, 'timestamp': timestamp}
        # datum_uids looks like {'dark': [reading1, reading2], ...}
        self._datum_uids[key].append(reading)
        return uid


class BulkReadableDetector:
    def __init__(self, path='fake_path', *, spec='AD_HDF5'):
        self.spec = spec
        self._resource = insert_resource(self.spec, path)
        self._datum_kwargs_map = {}
        self._reset_data()

    def _reset_data(self):
        self._datum_uids = []
        self._datum_kwargs_map = {}
        self._point_counter = itertools.count()

    def generate_datum(self, key, timestamp):
        "Stash kwargs for each datum, to be used below by unstage."
        uid = super().generate_datum(key, timestamp)
        i = next(self._point_counter)
        self._datum_kwargs_map[uid] = {'point_number': i}
        # (don't insert, obviously)
        return uid

    def bulk_read(self, timestamps):
        image_name = self.image_name

        uids = [self.generate_datum(self.image_name, ts) for ts in timestamps]
        datum_args = [self._datum_kwargs_map[uid] for uid in uids]

        bulk_insert_datum(self._resource, uids, datum_args)

        # clear so unstage will not save the images twice:
        self._reset_data()
        return {image_name: uids}


class Xspress3HDF5:
    spec = 'XSP_HDF5'

    def __init__(self, path):
        self._filestore_res = insert_resource(self.spec, path)
        self.mds_keys = {1: 'xspress3_ch1',
                         2: 'xspress3_ch2',
                         3: 'xspress3_ch3',
                         }


class BulkReadableXspress3(FSBase):
    def __init__(self, channels, path='fake_path'):
        self.hdf5 = Xspress3HDF5(path)
        self.channels = channels

    def bulk_read(self, timestamps):
        fs_res = self.hdf5._filestore_res

        if timestamps is None:
            raise ValueError('Timestamps must be set first')

        channels = self.channels
        ch_uids = {ch: [str(uuid.uuid4()) for ts in timestamps]
                   for ch in channels}

        count = len(timestamps)
        if count == 0:
            return {}

        def get_datum_args():
            for ch in channels:
                for seq_num in range(count):
                    yield {'frame': seq_num,
                           'channel': ch}

        uids = [ch_uids[ch] for ch in channels]
        bulk_insert_datum(fs_res, itertools.chain(*uids),
                          get_datum_args())
        return OrderedDict((self.hdf5.mds_keys[ch], ch_uids[ch])
                           for ch in channels)




In [5]:
@contextmanager
def mds_wrapper(keynames, *, scan_id=0):
    data_keys = {key: dict(source='somepv',
                           shape=None,
                           dtype='str')
                 for key in keynames
                 }
    run_start = mds.insert_run_start(scan_id=scan_id, beamline_id='csx',
                                 time=time.time(),
                                 uid=str(uuid.uuid4()), function='cos')

    descriptor = mds.insert_descriptor(data_keys=data_keys, time=time.time(),
                                   run_start=run_start, uid=str(uuid.uuid4()))
    yield descriptor

    mds.insert_run_stop(run_start, time=time.time(), uid=str(uuid.uuid4()))

@contextmanager
def mqmds_wrapper(keynames, *, scan_id=0):
    data_keys = {key: dict(source='somepv',
                           shape=None,
                           dtype='str')
                 for key in keynames
                 }
    run_start = mqmds.insert_run_start(scan_id=scan_id, beamline_id='csx',
                                 time=time.time(),
                                 uid=str(uuid.uuid4()), function='cos')

    descriptor = mqmds.insert_descriptor(data_keys=data_keys, time=time.time(),
                                   run_start=run_start, uid=str(uuid.uuid4()))
    yield descriptor

    mqmds.insert_run_stop(run_start, time=time.time(), uid=str(uuid.uuid4()))

@contextmanager
def sqmds_wrapper(keynames, *, scan_id=0):
    data_keys = {key: dict(source='somepv',
                           shape=None,
                           dtype='str')
                 for key in keynames
                 }
    run_start = sqmds.insert_run_start(scan_id=scan_id, beamline_id='csx',
                                 time=time.time(),
                                 uid=str(uuid.uuid4()), function='cos')
    print(run_start)
    descriptor = sqmds.insert_descriptor(data_keys=data_keys, time=time.time(),
                                   run_start=run_start, uid=str(uuid.uuid4()))
    yield descriptor

    sqmds.insert_run_stop(run_start, time=time.time(), uid=str(uuid.uuid4()))


@contextmanager
def h5mds_wrapper(keynames, *, scan_id=0):
    data_keys = {key: dict(source='somepv',
                           shape=None,
                           dtype='str')
                 for key in keynames
                 }
    run_start = h5mds.insert_run_start(scan_id=scan_id, beamline_id='csx',
                                 time=time.time(),
                                 uid=str(uuid.uuid4()), function='cos')

    descriptor = h5mds.insert_descriptor(data_keys=data_keys, time=time.time(),
                                   run_start=run_start, uid=str(uuid.uuid4()))
    yield descriptor

    h5mds.insert_run_stop(run_start, time=time.time(), uid=str(uuid.uuid4()))



In [8]:
def test_xspress3_online(n):
    t0 = time.time()
    timestamps = [t0 + i for i in range(n)]
    xspress3 = BulkReadableXspress3(channels=[1, 2, 3])
    with ctxtime() as fstime:
        keys = list(xspress3.hdf5.mds_keys.values())
        with mds_wrapper(keys) as descriptor:
            entries = xspress3.bulk_read(timestamps)
            events = [dict(descriptor=descriptor,
                           seq_num=i,
                           time=ts,
                           timestamps={key: ts for key in keys},
                           data={key: entries[key][i] for key in keys},
                           uid=str(uuid.uuid4())
                           )
                      for i, ts in enumerate(timestamps)
                      ]
            with ctxtime() as mdstime:
                mds.bulk_insert_events(descriptor, events)
            fstime.t0 += mdstime.elapsed

    print('filestore: inserting {} entries took {} sec'.format(n, fstime.elapsed))
    print('metadatastore: inserting {} entries took {} sec'.format(n, mdstime.elapsed))

def test_xspress3_json(n):
    t0 = time.time()
    timestamps = [t0 + i for i in range(n)]
    xspress3 = BulkReadableXspress3(channels=[1, 2, 3])
    with ctxtime() as fstime:
        keys = list(xspress3.hdf5.mds_keys.values())
        with mqmds_wrapper(keys) as descriptor:
            entries = xspress3.bulk_read(timestamps)
            events = [dict(descriptor=descriptor,
                           seq_num=i,
                           time=ts,
                           timestamps={key: ts for key in keys},
                           data={key: entries[key][i] for key in keys},
                           uid=str(uuid.uuid4())
                           )
                      for i, ts in enumerate(timestamps)
                      ]
            with ctxtime() as mdstime:
                mqmds.bulk_insert_events(descriptor, events)
            fstime.t0 += mdstime.elapsed

    print('filestore: inserting {} entries took {} sec'.format(n, fstime.elapsed))
    print('metadatastore: inserting {} entries took {} sec'.format(n, mdstime.elapsed))

def test_xspress3_hdf5(n):
    t0 = time.time()
    timestamps = [t0 + i for i in range(n)]
    xspress3 = BulkReadableXspress3(channels=[1, 2, 3])
    with ctxtime() as fstime:
        keys = list(xspress3.hdf5.mds_keys.values())
        with h5mds_wrapper(keys) as descriptor:
            entries = xspress3.bulk_read(timestamps)
            events = [dict(descriptor=descriptor,
                           seq_num=i,
                           time=ts,
                           timestamps={key: ts for key in keys},
                           data={key: entries[key][i] for key in keys},
                           uid=str(uuid.uuid4())
                           )
                      for i, ts in enumerate(timestamps)
                      ]
            with ctxtime() as mdstime:
                h5mds.bulk_insert_events(descriptor, events)
            fstime.t0 += mdstime.elapsed

    print('filestore: inserting {} entries took {} sec'.format(n, fstime.elapsed))
    print('metadatastore: inserting {} entries took {} sec'.format(n, mdstime.elapsed))



def test_xspress3_sqlite(n):
    t0 = time.time()
    timestamps = [t0 + i for i in range(n)]
    xspress3 = BulkReadableXspress3(channels=[1, 2, 3])
    with ctxtime() as fstime:
        keys = list(xspress3.hdf5.mds_keys.values())
        with sqmds_wrapper(keys) as descriptor:
            entries = xspress3.bulk_read(timestamps)
            events = [dict(descriptor=descriptor,
                           seq_num=i,
                           time=ts,
                           timestamps={key: ts for key in keys},
                           data={key: entries[key][i] for key in keys},
                           uid=str(uuid.uuid4())
                           )
                      for i, ts in enumerate(timestamps)
                      ]
            with ctxtime() as mdstime:
                sqmds.bulk_insert_events(descriptor, events)
            fstime.t0 += mdstime.elapsed

    print('filestore: inserting {} entries took {} sec'.format(n, fstime.elapsed))
    print('metadatastore: inserting {} entries took {} sec'.format(n, mdstime.elapsed))


    
    
    


In [16]:
for i in range(10):
    test_xspress3_online(16000)

filestore: inserting 16000 entries took 9.630425930023193 sec
metadatastore: inserting 16000 entries took 4.0162670612335205 sec
filestore: inserting 16000 entries took 20.281147003173828 sec
metadatastore: inserting 16000 entries took 2.6307640075683594 sec
filestore: inserting 16000 entries took 6.617537975311279 sec
metadatastore: inserting 16000 entries took 2.3965909481048584 sec
filestore: inserting 16000 entries took 6.3482630252838135 sec
metadatastore: inserting 16000 entries took 1.896867036819458 sec
filestore: inserting 16000 entries took 5.171757936477661 sec
metadatastore: inserting 16000 entries took 1.9283349514007568 sec
filestore: inserting 16000 entries took 4.6648828983306885 sec
metadatastore: inserting 16000 entries took 2.028747081756592 sec
filestore: inserting 16000 entries took 4.508883237838745 sec
metadatastore: inserting 16000 entries took 2.1063239574432373 sec
filestore: inserting 16000 entries took 4.670500755310059 sec
metadatastore: inserting 16000 ent

In [8]:
for i in range(5):
    test_xspress3_hdf5(16000)

TypeError: No conversion path for dtype: dtype('<U36')

In [9]:
for i in range(10):
    test_xspress3_sqlite(16000)

172ebacf-81cc-43ca-a8c0-015960a55ffd
filestore: inserting 16000 entries took 4.6490232944488525 sec
metadatastore: inserting 16000 entries took 0.226456880569458 sec
4e91f9ff-2a07-42e2-be5e-6536fd301cc6
filestore: inserting 16000 entries took 8.269920110702515 sec
metadatastore: inserting 16000 entries took 0.2356119155883789 sec
8e75255e-5d72-4f99-8ae5-4b2cb23bc116
filestore: inserting 16000 entries took 8.145911931991577 sec
metadatastore: inserting 16000 entries took 0.22661614418029785 sec
35b14696-ccc6-4b4f-8880-871f1a2e04ea
filestore: inserting 16000 entries took 4.529531002044678 sec
metadatastore: inserting 16000 entries took 0.22321701049804688 sec
4a1541e5-19f5-4910-989a-5dc4800fcc4c
filestore: inserting 16000 entries took 4.4435508251190186 sec
metadatastore: inserting 16000 entries took 0.2205660343170166 sec
60171974-c36c-4c33-95b4-a18afc7c38fb
filestore: inserting 16000 entries took 4.653646945953369 sec
metadatastore: inserting 16000 entries took 0.22381806373596191 sec


In [None]:
for i in range(10):
    test_xspress3_json(16000)

In [10]:
for i in range(10):
    test_xspress3_sqlite(160000)

760f3a89-026e-4373-89e7-e36d5a622c62
filestore: inserting 160000 entries took 48.32061195373535 sec
metadatastore: inserting 160000 entries took 2.4305450916290283 sec
e34d5ae3-3135-4c6d-af7d-938fc06f2cef
filestore: inserting 160000 entries took 51.57424592971802 sec
metadatastore: inserting 160000 entries took 2.3932900428771973 sec
1b5edbfd-00cd-4ffc-81c2-2f81f68e8bd4


KeyboardInterrupt: 

In [11]:
for i in range(10):
    test_xspress3_sqlite(1600000)

5220d947-0b75-4c9f-80be-024d6b0dfd02
filestore: inserting 1600000 entries took 592.131217956543 sec
metadatastore: inserting 1600000 entries took 24.35251808166504 sec
abd46e27-e475-4bc0-834b-5bf5bb75fb06


KeyboardInterrupt: 