In [None]:
# Startup databrokers and elastic search
import matplotlib.pyplot as plt
%matplotlib qt5
from pprint import pprint

from rapidz.graph import _clean_text, readable_graph
from xpdan.vend.callbacks.core import Retrieve
from xpdan.vend.callbacks.zmq import Publisher
from xpdconf.conf import glbl_dict

from databroker_elasticsearch import load_elasticindex
from databroker_elasticsearch.brokersearch import BrokerSearch

from databroker import Broker
import yaml
import esconverters

dbs = {}
for yaml_file in ['raw', 'an']:
    with open(f'{yaml_file}.yml', 'r') as f:
        dbs[yaml_file] = Broker.from_config(yaml.load(f))

an_db = dbs['an']
raw_db = dbs['raw']
raw_es = load_elasticindex('es-raw.yaml')
an_es = load_elasticindex('es-an.yaml')
raw_db_es = BrokerSearch(raw_db, raw_es)
an_db_es = BrokerSearch(an_db, an_es)

In [None]:
# query raw es for dino tooth
raw_es.qsearch()
vqraw = lambda q: pprint((q, raw_es.qsearch(q)))
vqraw('tooth')  # search all fields
vqraw('dinosaur')  # search all fields
vqraw('sample_name:dinosaur')  # search specific field
vqraw('dino*')  # glob-like search
vqraw('dinosaurus~2')  # fuzzy search max edit distance of 2
raw_hdr = next(iter(raw_db_es('dinosaurus~2')))
uid = raw_hdr.start['uid']

In [None]:
# queries with an_es
vqan = lambda q: pprint((q, an_es.qsearch(q)))
vqan('img_sinogram')
vqan(f'puid:{uid[:6]}*')  # word in puid
vqan('analysis_stage:img_sinogram')
vqan('usednodes.ndfunc:*sort_sinogram')
# query the algorithm kwarg
vqan('gridrec')
vqan('usednodes.ndkwargs.algorithm:gridrec')

In [None]:
# query an_es/databroker for tomo recon
# hdrs = an_db_es('analysis_stage:*tomo*')
# tomo_analysis_hdr = next(iter(hdrs))
tomo_analysis_hdr = an_db[-2]

In [None]:
# show the graph
from shed.replay import replay
from shed.translation import merkle_hash

# load the replay
graph, parents, data, vs = replay(raw_db, tomo_analysis_hdr)

# make the graph more accessible to humans by renaming things
# these names *should* match the names in the graph plot
for k, v in graph.nodes.items():
    v.update(label=_clean_text(str(v['stream'])).strip())
graph = readable_graph(graph)

# plot the graph
graph.nodes['data img FromEventStream']['stream'].visualize()

In [None]:
# The two pipelines are different because one has the sinogram data processing as well
old_hash = tomo_analysis_hdr.start['graph_hash']
print(old_hash)
current_hash = merkle_hash(graph.node[tomo_analysis_hdr.start['outbound_node']]['stream'])
print(current_hash)

In [None]:
# setup a publisher to send over to data viz and capture
p = Publisher(glbl_dict['inbound_proxy_address'], prefix=b'tomo')
z = graph.nodes['img_tomo ToEventStream']['stream'].LastCache().DBFriendly()
z.starsink(p)

In [None]:
# replay analysis with no changes
r = Retrieve(dbs['raw'].reg.handler_reg)
for v in vs:
    d = data[v['uid']]
    dd = r(*d)
    parents[v["node"]].update(dd)

In [None]:
dbs = {}
for yaml_file in ['raw', 'an']:
    with open(f'{yaml_file}.yml', 'r') as f:
        dbs[yaml_file] = Broker.from_config(yaml.load(f))
from databroker_elasticsearch.converters import register_converter

an_db = dbs['an']
raw_db = dbs['raw']
raw_es = load_elasticindex('es-raw.yaml')
an_es = load_elasticindex('es-an.yaml')
raw_db_es = BrokerSearch(raw_db, raw_es)
an_db_es = BrokerSearch(an_db, an_es)

print(an_db[-1].start['graph_hash'])
print(current_hash)

In [None]:
# change to Algebra
print(graph.nodes['starmap; recon_wrapper']['stream'].kwargs)
graph.nodes['starmap; recon_wrapper']['stream'].kwargs['algorithm'] = 'art'
print(graph.nodes['starmap; recon_wrapper']['stream'].kwargs)

# replay with changes
r = Retrieve(dbs['raw'].reg.handler_reg)
for v in vs:
    d = data[v['uid']]
    dd = r(*d)
    parents[v["node"]].update(dd)

In [None]:
dbs = {}
for yaml_file in ['raw', 'an']:
    with open(f'{yaml_file}.yml', 'r') as f:
        dbs[yaml_file] = Broker.from_config(yaml.load(f))
from databroker_elasticsearch.converters import register_converter

an_db = dbs['an']
raw_db = dbs['raw']
raw_es = load_elasticindex('es-raw.yaml')
an_es = load_elasticindex('es-an.yaml')
raw_db_es = BrokerSearch(raw_db, raw_es)
an_db_es = BrokerSearch(an_db, an_es)
print(an_db[-1].start['graph_hash'])
print(current_hash)

In [None]:
# an_es for new data (via new recon algo)
dbs = {}
for yaml_file in ['raw', 'an']:
    with open(f'{yaml_file}.yml', 'r') as f:
        dbs[yaml_file] = Broker.from_config(yaml.load(f))
from databroker_elasticsearch.converters import register_converter

an_db = dbs['an']
raw_db = dbs['raw']
raw_es = load_elasticindex('es-raw.yaml')
an_es = load_elasticindex('es-an.yaml')
raw_db_es = BrokerSearch(raw_db, raw_es)
an_db_es = BrokerSearch(an_db, an_es)
vqan = lambda q: pprint((q, an_es.qsearch(q)))

vqan('art')

In [None]:
dbs = {}
for yaml_file in ['raw', 'an']:
    with open(f'{yaml_file}.yml', 'r') as f:
        dbs[yaml_file] = Broker.from_config(yaml.load(f))
from databroker_elasticsearch.converters import register_converter

an_db = dbs['an']
raw_db = dbs['raw']
raw_es = load_elasticindex('es-raw.yaml')
an_es = load_elasticindex('es-an.yaml')
raw_db_es = BrokerSearch(raw_db, raw_es)
an_db_es = BrokerSearch(an_db, an_es)


hdr1 = next(iter(an_db_es('usednodes.ndkwargs.algorithm:art')))
hdr2 = next(iter(an_db_es('usednodes.ndkwargs.algorithm:gridrec')))

art = next(hdr1.data('img_tomo', stream_name='final_primary'))
grid = next(hdr2.data('img_tomo', stream_name='final_primary'))

# Compare results
fig, axs = plt.subplots(1, 3, tight_layout=True)
for img, ax in zip([art, grid], axs):
    ax.imshow(img)
axs[-1].imshow(art - grid)