# Sample ingest, tag and query data.

This notebook demonstrates how to use Splash-ML to ingest data into databroker using the ETL package, how to save tag sets using the TagService in the tagging packages, and how to query on those tags.

The notebook uses mongomock to mimic a mongo database instance in memory.

In [116]:
import sys; sys.path.insert(0, '../..')
import datetime
import glob
import os
import tempfile
from mongomock import MongoClient
from suitcase.mongo_normalized import Serializer

import etl.ingest
from tagging.tag_service import TagService

First, let's do some setup. We'll create a monomock instance which will be used by as a location to ingest data into and as a place to save and search on tags.

The `ETLExecutor` does a variety of things. It's provided an `input_root`, which is a directory that the `ETLExecutor` will scan recursively, finding assets to trasform and ingest into databroker.  The transformed original files and transformed files are stored in an anonymized location, specified by the `output_root` parameter. Finally, a bluesky document stream is produced.


An optional parameter can be provided: `properties_callback`. The takes a callable that can be used to extract properties data from each file as it is ingested. One usecase for this is that many existing datasets are saved in large file systems with much metadata implied by the file path and file names. This callback can be used to extract that metdata as the asset is ingested.

Information from each file's transformations can then be used in the ETLExecutor.createDocument, which is a databroker 'ingestor', creating a document set for the asset and its "thumbnails"

Let's create a properties_callback. This callable is used by the `ETLExecutor` to build up a list of detected properties as it 

In [119]:
def properties_callback(path):
    metadata = {}
    if 'agb' in path:
        metadata['scan_type'] = 'agb_calibration'
    return metadata

tagging_event_uid = ''

# create a tagger and tagging event reference for tags to reference
def setup_tagging():
    now = datetime.datetime.utcnow().replace(tzinfo = datetime.timezone.utc) \
        .astimezone().replace(microsecond = 0).isoformat()
    tagger_uid = tag_svc.create_tagger(
        {
            'uid': None,
            'type': 'model',
            'model_name': 'sample notebook',
            'create_time': now
        })

    tagging_event_uid = tag_svc.create_tagging_event(
        {
            'uid': None,
            'tagger_id': None,
            'run_time': now,
            'accuracy': 0.9234
        },
        tagger_uid)


def make_tag_set(start_doc):
    tags = []
    if start_doc.get('scan_type') == 'agb_calibration':
        tags.append({
            'tag': 'agb',
            'confidence': 0.99
        })
    else:
        tags.append({
            'tag': 'sample',
            'confidence': 0.99
        })  
    
    return {
        'sample_id': 'random',
        'tags': tags
    }


db = MongoClient()
input_dir = 'source'
output_dir = tempfile.TemporaryDirectory().name
print(output_dir)
#use glob to find all the files to ingest
paths = glob.glob(os.path.join(input_dir + '/**/*.*'), recursive=True)
etl_executor = ETLExecutor(input_dir, output_dir, properties_callback)
serializer = Serializer(
        db['databroker_db_name'],
        db['databroker_db_name'])

tag_svc = TagService(db, db_name='tagging')
setup_tagging()
paths

/tmp/tmpa5d99y7l


['source/radscan.tiff', 'source/agb.tiff']

OK, now that we've set everything up, let's go ahead and execute. We're pointed it the included source directory, that contains two sample tiff files.

We're going to execute, instructing the `etl_executor` to convert files to jpg and npy.

In [120]:
for file_path in paths:
    print(file_path)

    # extract and transform each file, creating a small jpg and small npy file
    raw_metadata, thumb_metadatas, return_metadata = etl_executor.execute(
                        file_path, [(223, 'jpg'), (223, 'npy')])
    print("\nreading and transforming file: " + file_path)

    # generate documentstream for each 
    docs = etl.ingest.createDocument(raw_metadata, output_dir,
                    thumb_metadatas, return_metadata)

    for name, doc in docs:
        # serialize the documents (into mongo)
        serializer(name, doc)
        if name == 'start':
            print("Creating tags for: " + repr(return_metadata))

            # now let's create tag sets in the TagService
            print(doc)
            tag_set = make_tag_set(doc)
            tag_svc.create_asset_tags(tag_set, tagging_event_uid)


source/radscan.tiff

reading and transforming file: source/radscan.tiff
Creating tags for: {}
{'uid': '4f1573eb-41dc-44e7-9de5-8926b3ad639c', 'time': 1600040902.95923}
source/agb.tiff

reading and transforming file: source/agb.tiff
Creating tags for: {'scan_type': 'agb_calibration'}
{'uid': 'b4d73672-acbf-4d3a-bce3-6b4aedc82664', 'time': 1600040902.9837074, 'scan_type': 'agb_calibration'}


Now that we have loaded the tagging database, we can do some queries on what we have. First, find random tagging events.

In [121]:
list(tag_svc.find_random_asset_sets(1))

[{'sample_id': 'random',
  'tags': [{'tag': 'agb', 'confidence': 0.99, 'event_id': ''}],
  'uid': 'bebade35-dd79-4ff7-a00b-0d738945e919',
  'schema_version': '0.01',
  '_id': ObjectId('5f5eafc6d1a5ea91881fda46')}]

We query based on tags. (note that this signature will be enhanced to make confidence parameters a range and optional)

In [122]:
list(tag_svc.find_tags_one_filter('sample', 0.99))

[{'sample_id': 'random',
  'tags': [{'tag': 'sample', 'confidence': 0.99, 'event_id': ''}],
  'uid': 'f7e732de-6f91-451d-85fd-11983665a879',
  'schema_version': '0.01',
  '_id': ObjectId('5f5eafc6d1a5ea91881fda39')}]