In [1]:
from elasticsearch import Elasticsearch
import yaml
import json
from collections import defaultdict

In [2]:
def get_config():
    with open('fastapi.yml', encoding='utf-8') as reader:
        conf = yaml.safe_load(reader)
    
    es_conf = conf.get("ELASTICSEARCH")

    return es_conf

In [3]:
def generate_dict(dep=1):
    if dep > 1:
        return defaultdict(lambda: generate_dict(dep - 1))
    if dep == 1:
        return defaultdict(dict)
    return dict()
    

In [4]:
def construct_query(bbox = '', bbox_relation = 'intersect', fields = [], source = True, results_per_page = '', page = '', query_string='', **terms):
    # fancy thing found on stackoverflow
    my_query = generate_dict(7)

    my_query['query']['bool']['must'] = [{'match_all': dict()}]

    
    for field, values in terms.items():
        term_constraint = generate_dict(1)
        term_constraint['terms'][field] = [i.lower() for i in values]
        my_query['query']['bool']['must'].append(term_constraint)

    if bbox:
        my_query['query']['bool']['filter']['geo_shape']['geographicExtent']['shape']['type'] = 'envelope'
        my_query['query']['bool']['filter']['geo_shape']['geographicExtent']['shape']['coordinates'] = bbox
        my_query['query']['bool']['filter']['geo_shape']['geographicExtent']['relation'] = bbox_relation

    if fields:
        my_query['fields'] = fields

    if not source:
        my_query['_source'] = source
    
    if results_per_page:
        my_query['size'] = results_per_page

    if page:
        my_query['from'] = my_query.get('size', 10) * (page - 1)

    if query_string:
        my_query['query']['bool']['must'] += [{'query_string': {'query': query_string}}]

    return f"{json.dumps(my_query, indent = 2)}"

In [5]:
conf = get_config()
es = Elasticsearch(**conf['SESSION_KWARGS'])

def search(bbox = '', bbox_relation = 'intersect', fields = [], source = True, results_per_page = '', page = '', query_string='', hits = True,   **terms):

    my_query = construct_query(bbox=bbox, bbox_relation=bbox_relation, fields=fields, source=source,  results_per_page=results_per_page, page = page, query_string=query_string, **terms)

    # save query
    with open('my_query.json', 'w') as f:
        f.write(my_query)
    
    response = es.search(index="stac-moles-test", body=my_query)
    
    if hits:
        return response['hits']['hits']
    return response


In [6]:
def get_related_objects_observation(my_record):
    related_uuids = []

    # instruments and platforms through acquistion route
    acquisition_uuid = my_record['procedureAcquisition']
    if acquisition_uuid is not None:
        acqusition = search(uuid=[acquisition_uuid])[0]
        for ipp in acqusition['_source']['instrumentPlatformPair']:
            related_uuids += [ipp['instrument'], ipp['platform']]
    
    # computations, instruments and platforms through composite process route
    composite_process_uuid = my_record['procedureCompositeProcess']
    if composite_process_uuid is not None:
        composite_process = search(uuid=[composite_process])[0]
        computations_uuid = composite_process['computationComponent']
        related_uuids += computations_uuid

        acquisitions_uuid = composite_process['_source']['acquisitionComponent']

        if acquisitions_uuid:
            acquisitions = search(uuid = acquisition_uuid)
            for acc in acquisitions:
                for ipp in acqusition['_source']['instrumentPlatformPair']:
                    related_uuids += [ipp['instrument'], ipp['platform']]
    
    # projects 
    projects_uuid = my_record['projects']
    related_uuids += projects_uuid

    collections = search(model=['observationcollection'], member = [my_record['uuid']])
    collections_uuid = [i['_source']['uuid'] for i in collections]
    related_uuids += collections_uuid
    


    return related_uuids   



def get_related_objects_project(my_record):
    uuid = my_record['uuid']
    related_uuids = search(query_string=uuid, fields=['uuid'], size=10000, source=False)
    related_uuids = [i['fields']['uuid'][0] for i in related_uuids]
    related_uuids.remove(uuid)
    
    return related_uuids

def get_related_objects_collection(my_record):
    uuid = my_record['uuid']

    related_uuids = search(query_string=uuid, model = ['project'], fields=['uuid'], size=10000, source=False)
    related_uuids = [i['fields']['uuid'][0] for i in related_uuids]
    
    related_uuids += my_record['member']

    return related_uuids    

def get_related_objects_instrument(my_record):
    uuid = my_record['uuid']

    related_uuids = []

    acquisitions = search(query_string=uuid, model = ['acquisition'], fields=['uuid'], size=10000)

    acquisitions_uuid = [i['fields']['uuid'][0] for i in acquisitions]
    query_string = ' OR '.join([f"({i})" for i in acquisitions_uuid])
    rel_to_acq = search(query_string=query_string, fields=['uuid', 'model'], size=10000, source=False)
    related_uuids += [i['fields']['uuid'][0] for i in rel_to_acq if i['fields']['model'] == ['observation']]

    comps_uuid = [ i['fields']['uuid'][0] for i in rel_to_acq if i['fields']['model'] == ['compositeprocess']]
    query_string = ' OR '.join([f"({i})" for i in comps_uuid])
    rel_to_comps = search(query_string=query_string, fields=['uuid', 'model'], size=10000, source=False)
    related_uuids += [i['fields']['uuid'][0] for i in rel_to_comps if i['fields']['model'] == ['observation']]


    inst_plat_pairs = []
    for i in acquisitions:
        ipp = i['_source']['instrumentPlatformPair']
        if isinstance(ipp, list):
            inst_plat_pairs += ipp
        else:
            inst_plat_pairs += [ipp]
        
    for ipp in inst_plat_pairs:
        if ipp['instrument'] == uuid:
            related_uuids += [ipp['platform']]
    
    related_uuids += my_record['subInstrument']
   

    return related_uuids

def get_related_objects_platform(my_record):
    uuid = my_record['uuid']

    related_uuids = []

    acquisitions = search(query_string=uuid, model = ['acquisition'], fields=['uuid'], size=10000)

    acquisitions_uuid = [i['fields']['uuid'][0] for i in acquisitions]
    query_string = ' OR '.join([f"({i})" for i in acquisitions_uuid])
    rel_to_acq = search(query_string=query_string, fields=['uuid', 'model'], size=10000, source=False)
    related_uuids += [i['fields']['uuid'][0] for i in rel_to_acq if i['fields']['model'] == ['observation']]

    comps_uuid = [ i['fields']['uuid'][0] for i in rel_to_acq if i['fields']['model'] == ['compositeprocess']]
    query_string = ' OR '.join([f"({i})" for i in comps_uuid])
    rel_to_comps = search(query_string=query_string, fields=['uuid', 'model'], size=10000, source=False)
    related_uuids += [i['fields']['uuid'][0] for i in rel_to_comps if i['fields']['model'] == ['observation']]


    inst_plat_pairs = []
    for i in acquisitions:
        ipp = i['_source']['instrumentPlatformPair']
        if isinstance(ipp, list):
            inst_plat_pairs += ipp
        else:
            inst_plat_pairs += [ipp]
        
    for ipp in inst_plat_pairs:
        if ipp['platform'] == uuid:
            related_uuids += [ipp['instrument']]

    related_uuids += my_record['childPlatform']
   

    return related_uuids

def get_related_objects_computation(my_record):
    uuid = my_record['uuid']

    related_uuids = []

    related_objects = search(query_string=uuid, model = ['observation', 'compositeprocess'], fields=['uuid', 'model'], size=10000)
    related_uuids += [i['fields']['uuid'][0] for i in related_objects if i['fields']['model'] == ['observation']]

    
    comps_uuid = [i['fields']['uuid'][0] for i in related_objects if i['fields']['model'] == ['compositeprocess']]
    query_string = ' OR '.join([f"({i})" for i in comps_uuid])
    rel_to_comps = search(query_string=query_string, fields=['uuid', 'model'], size=10000, source=False)
    related_uuids += [i['fields']['uuid'][0] for i in rel_to_comps if i['fields']['model'] == ['observation']]

    return related_uuids


In [7]:
def get_related_objects(uuid):
    functions_map = {
        'observation': get_related_objects_observation,
        'project': get_related_objects_project,
        'observationcollection': get_related_objects_collection,
        'instrument': get_related_objects_instrument,
        'platform': get_related_objects_platform,
        'computation': get_related_objects_computation
    }


    related_uuids = []
    hits = search(uuid=[uuid])
    
    for record in hits:
        model = record['_source']['model']
        related_uuids += functions_map[model](record['_source'])
    
    return list(set(related_uuids))

    

In [8]:
MODELS = ['observationcollection',
        'observation',
        'project',
        'instrument',
        'platform',
        'computation']

def display_records(response, query_args):
    hits = response['hits']['hits']
    total = response['hits']['total']['value']
    
    grouped_data = defaultdict(int)
    query_args['results_per_page'] = 1
    query_args['hits'] = False
    for model in MODELS:
        query_args['model'] = [model]
        res = search(**query_args)
        grouped_data[model] = res['hits']['total']['value']

    # for hit in hits:
    #     model = hit['_source']['model']
    #     grouped_data[model] += 1


    print(f'{total} Results\n')

    print('Record type')
    for record_type, count in grouped_data.items():
        print(f'{record_type.capitalize() + "s"}({count})')
    print()

    for hit in hits[:10]:
        fields = hit['_source']
        print(fields['model'].capitalize())
        print(fields['title'])
        if 'publicationState' in fields:
            print('Publication state:', fields['publicationState'])
        print(fields['uuid'])
        print(f"Keywords: {fields['keywords']}")
        print()

In [9]:
from time import time

In [10]:
start = time()
query_args = {
    'hits': False,
    'model': [
        'observationcollection',
        'observation',
        'project',
        'instrument',
        'platform',
        'computation'],
    'results_per_page': 20,
    'page': 1,
    'query_string': 'ozone'
}   
response = search(**query_args)
display_records(response, query_args)
print('Exec time', time() - start)

  if sys.path[0] == '':


1674 Results

Record type
Observationcollections(80)
Observations(1345)
Projects(84)
Instruments(80)
Platforms(16)
Computations(69)

Observation
Ozone eddy covariance measurements at Penlee Point Atmospheric Observatory, April-May 2018
Publication state: citable
8351ed155b134155848d03a7cdce9f02
Keywords: Ozone, eddy, Penlee Point

Observation
Global Sensitivity Analysis of Tropospheric Ozone and OH: Budgets from three global chemistry-climate models
Publication state: published
d5afa10e50b44229b079c7c5a036e660
Keywords: Tropospheric ozone burden, tropospheric methane lifetime, Gaussian process emulation, sensitivity analysis

Project
IPCC Fourth Assessment Report: Climate Change 2007 (AR4)
Publication state: working
0940f94c73bd4a63a7e126b3c23b60ec
Keywords: Climate Change, AR4, IPCC, DDC, CMIP3

Observation
20 and 30 year climatologies from CMIP3 climate model output as used in the 2007 IPCC Fourth Assessment Report (AR4)
Publication state: published
e6edf4076e7e42f681f495ccf0ec22cb
K

### FAAM Collection runtime test

In [8]:
from time import time

In [241]:
with open('runtime.tsv', 'a') as f:
    f.write(f'index\ttime\n')
    for _ in range(10):
        objects_related_to = 'affe775e8d8890a4556aec5bc4e0b45c'
        rel = get_related_objects(objects_related_to)
        
        start = time()
        hits = []
        while rel:
            part_of_rel, rel = rel[:1024], rel[1024:]

            query = " OR ".join(f'"{uuid}"' for uuid in part_of_rel)
            query = construct_query(query_string=f"uuid:({query})", size=1024)
            
            response = es.search(index="stac-moles-test", body=query)
            hits += response['hits']['hits']

        end = time()
        print(len(hits))
        f.write(f'Moles\t{end - start}\n')


  from ipykernel import kernelapp as app


1295
1295
1295
1295
1295
1295
1295
1295
1295
1295
