In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from dotenv import load_dotenv

load_dotenv("../../.env.localhost")

True

In [3]:
from nmdc_runtime.pipelines.core import run_config_frozen__preset_normal_env
from nmdc_runtime.resources.mongo import get_mongo


mongo = get_mongo(run_config_frozen__preset_normal_env)
db = mongo.db
set(db.list_collection_names())

{'_tmp__get_file_size_bytes',
 'activity_set',
 'biosample_set',
 'capabilities',
 'data_object_set',
 'date_created',
 'etl_software_version',
 'file_type_enum',
 'functional_annotation_set',
 'genome_feature_set',
 'ids',
 'mags_activity_set',
 'metabolomics_analysis_activity_set',
 'metagenome_annotation_activity_set',
 'metagenome_assembly_set',
 'metaproteomics_analysis_activity_set',
 'metatranscriptome_activity_set',
 'nmdc_schema_version',
 'nom_analysis_activity_set',
 'notes',
 'object_types',
 'objects',
 'omics_processing_set',
 'operations',
 'read_QC_analysis_activity_set',
 'read_based_analysis_activity_set',
 'sites',
 'study_set',
 'triggers',
 'users',
 'workflows'}

In [4]:
from nmdc_runtime.pipelines.core import run_config_frozen__preset_normal_env
from nmdc_runtime.resources.core import get_runtime_api_site_client


client = get_runtime_api_site_client(run_config_frozen__preset_normal_env)

In [5]:
from nmdc_runtime.util import nmdc_jsonschema, nmdc_jsonschema_validate

In [6]:
gold_etl_latest = db.objects.find_one({"name": "nmdc_database.json.zip"}, sort=[("created_time", -1)])

In [7]:
rv = client.get_object_bytes(gold_etl_latest["id"])

In [9]:
from io import BytesIO
import json
from zipfile import ZipFile

with ZipFile(BytesIO(rv.content)) as myzip:
    # may be e.g. 'metadata-translation/src/bin/output/nmdc_database.json' rather than 'nmdc_database.json'
    name = next(n for n in myzip.namelist() if n.endswith("nmdc_database.json"))
    with myzip.open(name) as f:
        nmdc_database = json.load(f)

In [13]:
from pprint import pprint

nmdc_db_collection_names_to_drop = set(nmdc_jsonschema["definitions"]["Database"]["properties"])
nmdc_db_collection_names_to_drop -= {
    # not actually collections
    "activity_set",
    "nmdc_schema_version",
    "date_created",
    "etl_software_version",
    # big collections, loaded elsewhere
    "functional_annotation_set",
    "genome_feature_set",
    
}
pprint(nmdc_db_collection_names_to_drop)

{'biosample_set',
 'data_object_set',
 'mags_activity_set',
 'metabolomics_analysis_activity_set',
 'metagenome_annotation_activity_set',
 'metagenome_assembly_set',
 'metaproteomics_analysis_activity_set',
 'metatranscriptome_activity_set',
 'nom_analysis_activity_set',
 'omics_processing_set',
 'read_QC_analysis_activity_set',
 'read_based_analysis_activity_set',
 'study_set'}


In [14]:
def init_database(db, coll_names):
    for coll_name in coll_names:
        print(f"dropping {coll_name}, creating index")
        db.drop_collection(coll_name)
        db[coll_name].create_index("id", unique=True)
        
init_database(db, nmdc_db_collection_names_to_drop)

dropping metabolomics_analysis_activity_set, creating index
dropping metagenome_assembly_set, creating index
dropping metaproteomics_analysis_activity_set, creating index
dropping biosample_set, creating index
dropping study_set, creating index
dropping metatranscriptome_activity_set, creating index
dropping data_object_set, creating index
dropping read_QC_analysis_activity_set, creating index
dropping metagenome_annotation_activity_set, creating index
dropping read_based_analysis_activity_set, creating index
dropping omics_processing_set, creating index
dropping nom_analysis_activity_set, creating index
dropping mags_activity_set, creating index


# fix biosample.part_of and add docs

In [15]:
from pprint import pprint

from toolz import assoc_in, dissoc, get_in

from nmdc_runtime.api.core.util import pick

new_docs = []
for doc in nmdc_database["biosample_set"]:
    if "part of" in doc:
        doc = assoc_in(doc, ["part_of"], get_in(["part of"], doc))
        doc = dissoc(doc, "part of")
    new_docs.append(doc)

nmdc_database["biosample_set"] = new_docs
len(nmdc_database["biosample_set"])

844

In [16]:
ok = nmdc_jsonschema_validate(nmdc_database)

In [17]:
rv = mongo.add_docs(nmdc_database)

In [18]:
rv['biosample_set'].upserted_count

844

# GOLD IDs to IGSNs

In [None]:
# from dagster import build_solid_context

# from nmdc_runtime.solids.core import local_file_to_api_object as lftao

# context = build_solid_context(resources={"mongo": mongo, "runtime_api_site_client": client})


# def local_file_to_api_object(file_info):
#     return lftao(context, file_info)

# #obj = local_file_to_api_object({"storage_path": storage_path, "mime_type": 'text/csv'})

In [19]:
oid__gold_ids_to_igns = "c6hs-4rzh-31"
doc = db.objects.find_one({"id": oid__gold_ids_to_igns})
assert doc["name"] == "2020-23-12-brodie-Gs0135149-soil-metadata.csv"

In [20]:
from collections import defaultdict
import csv
from io import StringIO
import re

GOLD_ID_IDX = 5
IGSN_IDX = 2

igsn_golds = defaultdict(list)

gold_id_pattern = re.compile(r"Gb\d+")

f = StringIO(client.get_object_bytes(oid__gold_ids_to_igns).text)
reader = csv.reader(f)
for row in reader:
    gold_id = row[GOLD_ID_IDX]
    igsn = row[IGSN_IDX]
    if gold_id_pattern.fullmatch(gold_id):
        igsn_golds[igsn].append(gold_id)

In [21]:
from pprint import pprint

from toolz import get_in

new_biosample_docs = []

for igsn, golds in igsn_golds.items():
    igsn_curie = "igsn:"+igsn
    doc = db.biosample_set.find_one({"id": {"$in": [f"gold:{g}" for g in golds]}})
    if doc is None:
        print(igsn, golds)
        continue
    doc = assoc_in(doc, ["id"], igsn_curie)
    doc = dissoc(doc, "_id")
    new_biosample_docs.append(doc)

assert len(new_biosample_docs) == len(igsn_golds)

In [22]:
mongo.add_docs({"biosample_set": new_biosample_docs})

{'biosample_set': <pymongo.results.BulkWriteResult at 0x7f8a1091d040>}

In [23]:
from pymongo import DeleteMany
from toolz import concat

requests = [DeleteMany({"id": {"$in": ["gold:"+g for g in concat(igsn_golds.values())]}})]
rv = mongo.db.biosample_set.bulk_write(requests)
rv.deleted_count

48

# Update omics_processing_set references to biosample_set ids

In [24]:
goldid_igsn = {}
for igsn, gids in igsn_golds.items():
    for gid in gids:
        goldid_igsn[gid] = igsn

In [25]:
requests = []
to_replace = {"gold:"+k: "igsn:"+v for k, v in goldid_igsn.items()}

for doc in db.omics_processing_set.find({"has_input": {"$in": list(to_replace)}}):
    operations = {"$set": {
        "has_input": [to_replace.get(i, i) for i in doc["has_input"]],
    }}
    requests.append({"filter": {"_id": doc["_id"]}, "update": operations})

In [26]:
from pymongo import UpdateOne

rv = db.omics_processing_set.bulk_write([UpdateOne(**r) for r in requests])

In [27]:
rv.modified_count

93

In [28]:
docs = [dissoc(doc, "_id") for doc in db.omics_processing_set.find()]

In [29]:
ok = nmdc_jsonschema_validate({"omics_processing_set": docs})

# Update omics_processing_set references from EMSL ids to IGSNs

Skip this -- it updates zero documents!

In [30]:
EMSL_IDS_IDX = 7
IGSN_IDX = 2

igsn_emsls = {}

emsl_ids_pattern = re.compile(r"\d+")

f = StringIO(client.get_object_bytes(oid__gold_ids_to_igns).text)
reader = csv.reader(f)
for row in reader:
    emsl_ids = row[EMSL_IDS_IDX]
    igsn = row[IGSN_IDX]
    ids = emsl_ids_pattern.findall(emsl_ids)
    # XXX some rows have emsl ids but no IGSN, so igsn.strip() check here
    if igsn.strip() and ids:
        igsn_emsls[igsn] = ids

In [31]:
emslid_igsn = {}
for igsn, eids in igsn_emsls.items():
    for eid in eids:
        emslid_igsn[eid] = igsn

In [32]:
n_with_emsl_id = db.omics_processing_set.count_documents(
    {"id": {"$in": ["emsl:"+i for i in emslid_igsn]}})

In [33]:
requests = []
to_replace = {"emsl:"+k: "igsn:"+v for k, v in emslid_igsn.items()}
to_replace.update({"emsl:output_"+k: "igsn:"+v for k, v in emslid_igsn.items()})

def omit(blacklist, d):
    return keyfilter(lambda k: k not in blacklist, d)

def sans_mongo_id(d):
    return omit(["_id"], d)


for doc in db.omics_processing_set.find({"has_input": {"$in": list(to_replace)}}):
    operations = {"$set": {
        "has_input": [to_replace.get(i, i) for i in doc["has_input"]],
    }}
    requests.append({"filter": {"_id": doc["_id"]}, "update": operations})

In [34]:
if requests:
    rv = db.omics_processing_set.bulk_write([UpdateOne(**r) for r in requests])
    print(rv.modified_count)

# metaP_stegen.ipynb

In [43]:
to_fetch = [{
    # >100MB
    "url": "https://portal.nersc.gov/cfs/m3408/meta/stegen_MetaProteomicAnalysis_activity.json",
    "type": "metaproteomics_analysis_activity_set",
}, {
    # ~50KB
    "url": "https://portal.nersc.gov/cfs/m3408/meta/stegen_emsl_analysis_data_objects.json",
    "type": "data_object_set"
}]

In [44]:
import re

pattern = re.compile(r"https?://(?P<domain>[^/]+)/(?P<path>.+)")

def url_to_name(url):
    m = pattern.match(url)
    return f"{'.'.join(reversed(m.group('domain').split('.')))}__{m.group('path').replace('/', '.')}"

In [45]:
import json
from pathlib import Path

import requests

def download_them_all(to_fetch):
    for i, spec in enumerate(to_fetch):
        url = spec["url"]
        name = url_to_name(url)
        print(f"{i+1}/{len(to_fetch)}: fetching {url}")
        rv = requests.get(url)
        print(f"saving as {name}")
        with open(f'/Users/dwinston/Downloads/{name}', 'w') as f:
            json.dump(rv.json(), f)

In [46]:
download_them_all(to_fetch)

1/2: fetching https://portal.nersc.gov/cfs/m3408/meta/stegen_MetaProteomicAnalysis_activity.json
saving as gov.nersc.portal__cfs.m3408.meta.stegen_MetaProteomicAnalysis_activity.json
2/2: fetching https://portal.nersc.gov/cfs/m3408/meta/stegen_emsl_analysis_data_objects.json
saving as gov.nersc.portal__cfs.m3408.meta.stegen_emsl_analysis_data_objects.json


In [55]:
from toolz import identity, dissoc, assoc_in

metaP_field_map = {
    "PeptideSequence": ("peptide_sequence", identity),
    "sum(MASICAbundance)": ("peptide_sum_masic_abundance", int),
    "sum_MASICAbundance": ("peptide_sum_masic_abundance", int),
    "SpectralCount": ("peptide_spectral_count", int),
    "BestProtein": ("best_protein", identity),
    "min(QValue)": ("min_q_value", float),
    "min_QValue": ("min_q_value", float),
    
    "peptide_sequence": ("peptide_sequence", identity),
    "peptide_sum_masic_abundance": ("peptide_sum_masic_abundance", int),
    "peptide_spectral_count": ("peptide_spectral_count", int),
    "best_protein": ("best_protein", identity),
    "min_q_value": ("min_q_value", float),
}


def map_fields(doc, field_map=None):
    for k_old, todo in field_map.items():
        if k_old in doc:
            k_new, fn = todo
            # work around e.g. "ValueError: invalid literal for int() with base 10: '400840000.0'"
            try:
                v_new = fn(doc[k_old])
            except ValueError:
                v_new = fn(float(doc[k_old]))
            doc = dissoc(doc, k_old)
            doc = assoc_in(doc, [k_new], v_new)
    return doc


def correct_metaP_doc(doc):
    if not "has_peptide_quantifications" in doc:
        return doc
    new_items = [
        map_fields(item, metaP_field_map) for item in doc["has_peptide_quantifications"]
    ]
    doc = assoc_in(
        doc,
        ["has_peptide_quantifications"],
        new_items,
    )
    return doc

In [56]:
def fetch_downloaded_json(name):
    with open(f'/Users/dwinston/Downloads/{name}') as f:
        return json.load(f)

In [57]:
from collections import defaultdict

def fetch_metaP_validate_and_add(to_fetch):
    to_add = defaultdict(list)
    for i, spec in enumerate(to_fetch):
        url = spec["url"]
        name = Path(url).name
        collection_name = spec["type"]
        print(f"{i+1}/{len(to_fetch)}: fetching {name} ({collection_name})")
        docs = fetch_downloaded_json(url_to_name(url))
        if not isinstance(docs, list):
            docs = [docs]
        docs = [correct_metaP_doc(d) for d in docs]
        to_add[collection_name].extend(docs)
    print("validating")
    nmdc_jsonschema_validate(to_add)
    print("adding")
    mongo.add_docs(to_add, validate=False)

In [58]:
fetch_metaP_validate_and_add(to_fetch)

1/2: fetching stegen_MetaProteomicAnalysis_activity.json (metaproteomics_analysis_activity_set)
2/2: fetching stegen_emsl_analysis_data_objects.json (data_object_set)
validating
adding


In [59]:
to_fetch = [{
    "url": "https://portal.nersc.gov/project/m3408/meta/501128_1781_100340_stegen_MetaProteomicAnalysis_activity.json",
    "type": "metaproteomics_analysis_activity_set",
}, {
    "url": "https://portal.nersc.gov/project/m3408/meta/501128_1781_100340_stegen_emsl_analysis_data_objects.json",
    "type": "data_object_set"
}]

In [60]:
download_them_all(to_fetch)

1/2: fetching https://portal.nersc.gov/project/m3408/meta/501128_1781_100340_stegen_MetaProteomicAnalysis_activity.json
saving as gov.nersc.portal__project.m3408.meta.501128_1781_100340_stegen_MetaProteomicAnalysis_activity.json
2/2: fetching https://portal.nersc.gov/project/m3408/meta/501128_1781_100340_stegen_emsl_analysis_data_objects.json
saving as gov.nersc.portal__project.m3408.meta.501128_1781_100340_stegen_emsl_analysis_data_objects.json


In [61]:
fetch_metaP_validate_and_add(to_fetch)

1/2: fetching 501128_1781_100340_stegen_MetaProteomicAnalysis_activity.json (metaproteomics_analysis_activity_set)
2/2: fetching 501128_1781_100340_stegen_emsl_analysis_data_objects.json (data_object_set)
validating
adding


# mongo_etl_demo.ipynb

In [62]:
to_fetch = [{
    "url": "https://portal.nersc.gov/cfs/m3408/meta/mt_annotation_objects.json",
    "type": "metagenome_annotation_activity_set"
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/mt_annotation_data_objects.json",
    "type": "data_object_set"
}, {
    "url": "https://portal.nersc.gov/project/m3408/meta/metagenomeAssembly_activity.json",
    "type": "metagenome_assembly_set",
}, {
    "url": "https://portal.nersc.gov/project/m3408/meta/metagenomeAssembly_data_objects.json",
    "type": "data_object_set",
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/ReadbasedAnalysis_activity.json",
    "type": "read_based_analysis_activity_set"
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/ReadbasedAnalysis_data_objects.json",
    "type": "data_object_set"
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/MAGs_activity.json",
    "type": "mags_activity_set",
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/MAGs_data_objects.json",
    "type": "data_object_set"
}, {
    "url": "https://portal.nersc.gov/project/m3408/meta/readQC_activity.json",
    "type": "read_QC_analysis_activity_set"
}, {
    "url": "https://portal.nersc.gov/project/m3408/meta/readQC_activity_data_objects.json",
    "type": "data_object_set"
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/img_mg_annotation_objects.json",
    "type": "metagenome_annotation_activity_set",
}, {
    "url": "https://portal.nersc.gov/cfs/m3408/meta/img_mg_annotation_data_objects.json",
    "type": "data_object_set",
}, {
    "url": "https://nmdcdemo.emsl.pnnl.gov/metabolomics/registration/gcms_metabolomics_data_products.json",
    "type": "data_object_set"
}, {
    "url": "https://nmdcdemo.emsl.pnnl.gov/nom/registration/ftms_nom_data_products.json",
    "type": "data_object_set"
}]

In [63]:
download_them_all(to_fetch)

1/14: fetching https://portal.nersc.gov/cfs/m3408/meta/mt_annotation_objects.json
saving as gov.nersc.portal__cfs.m3408.meta.mt_annotation_objects.json
2/14: fetching https://portal.nersc.gov/cfs/m3408/meta/mt_annotation_data_objects.json
saving as gov.nersc.portal__cfs.m3408.meta.mt_annotation_data_objects.json
3/14: fetching https://portal.nersc.gov/project/m3408/meta/metagenomeAssembly_activity.json
saving as gov.nersc.portal__project.m3408.meta.metagenomeAssembly_activity.json
4/14: fetching https://portal.nersc.gov/project/m3408/meta/metagenomeAssembly_data_objects.json
saving as gov.nersc.portal__project.m3408.meta.metagenomeAssembly_data_objects.json
5/14: fetching https://portal.nersc.gov/cfs/m3408/meta/ReadbasedAnalysis_activity.json
saving as gov.nersc.portal__cfs.m3408.meta.ReadbasedAnalysis_activity.json
6/14: fetching https://portal.nersc.gov/cfs/m3408/meta/ReadbasedAnalysis_data_objects.json
saving as gov.nersc.portal__cfs.m3408.meta.ReadbasedAnalysis_data_objects.json
7/

In [64]:
from collections import defaultdict

def fetch_validate_and_add(to_fetch):
    to_add = defaultdict(list)
    for i, spec in enumerate(to_fetch):
        url = spec["url"]
        name = Path(url).name
        collection_name = spec["type"]
        print(f"{i+1}/{len(to_fetch)}: fetching {name} ({collection_name})")
        docs = fetch_downloaded_json(url_to_name(url))
        if not isinstance(docs, list):
            docs = [docs]
        to_add[collection_name].extend(docs)
    print("validating")
    nmdc_jsonschema_validate(to_add)
    print("adding")
    mongo.add_docs(to_add, validate=False)

In [65]:
fetch_validate_and_add(to_fetch)

1/14: fetching mt_annotation_objects.json (metagenome_annotation_activity_set)
2/14: fetching mt_annotation_data_objects.json (data_object_set)
3/14: fetching metagenomeAssembly_activity.json (metagenome_assembly_set)
4/14: fetching metagenomeAssembly_data_objects.json (data_object_set)
5/14: fetching ReadbasedAnalysis_activity.json (read_based_analysis_activity_set)
6/14: fetching ReadbasedAnalysis_data_objects.json (data_object_set)
7/14: fetching MAGs_activity.json (mags_activity_set)
8/14: fetching MAGs_data_objects.json (data_object_set)
9/14: fetching readQC_activity.json (read_QC_analysis_activity_set)
10/14: fetching readQC_activity_data_objects.json (data_object_set)
11/14: fetching img_mg_annotation_objects.json (metagenome_annotation_activity_set)
12/14: fetching img_mg_annotation_data_objects.json (data_object_set)
13/14: fetching gcms_metabolomics_data_products.json (data_object_set)
14/14: fetching ftms_nom_data_products.json (data_object_set)
validating
adding


In [66]:
manifests = [{
    "url": (
        "https://nmdcdemo.emsl.pnnl.gov/metabolomics/registration/"
        "gcms_metabolomics_metadata_products.json"
    ),
    "type": "metabolomics_analysis_activity_set"
}, {
    "url": (
        "https://nmdcdemo.emsl.pnnl.gov/nom/registration/"
        "ftms_nom_metadata_products.json"
    ),
    "type": "nom_analysis_activity_set"
}]

In [67]:
import requests


to_fetch = []

for m in manifests:
    urls = requests.get(m["url"]).json()
    for url in urls:
        to_fetch.append({"url": url, "type": m["type"]})

In [68]:
to_fetch

[{'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_S2_0_10_7_Metab.json',
  'type': 'metabolomics_analysis_activity_set'},
 {'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_S3_20_30_14_Metab.json',
  'type': 'metabolomics_analysis_activity_set'},
 {'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_S3_10_20_5_Metab.json',
  'type': 'metabolomics_analysis_activity_set'},
 {'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_S3_50_60_4_Metab.json',
  'type': 'metabolomics_analysis_activity_set'},
 {'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_S1_40_50_17_Metab.json',
  'type': 'metabolomics_analysis_activity_set'},
 {'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_N1_0_10_13_Metab.json',
  'type': 'metabolomics_analysis_activity_set'},
 {'url': 'https://nmdcdemo.emsl.pnnl.gov/metabolomics/metadata/Froze_Core_2015_S1_30_40

In [69]:
import concurrent.futures
import json

import requests
from tqdm.notebook import tqdm

def fetch_json(url):
    return requests.get(url).json()


def download_them_all_parallel(to_fetch):
    error_urls = []
    pbar = tqdm(total=len(to_fetch))
    urls = [spec["url"] for spec in to_fetch]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_url = {
            executor.submit(fetch_json, url): url
            for url in urls
        }
        for future in concurrent.futures.as_completed(future_to_url):
            pbar.update(1)
            url = future_to_url[future]
            try:
                payload = future.result()
            except Exception as e:
                error_urls.append((url, str(e)))
            else:
                name = url_to_name(url)
                with open(f'/Users/dwinston/Downloads/{name}', 'w') as f:
                    json.dump(payload, f)

    pbar.close()
    return error_urls

In [70]:
download_them_all_parallel(to_fetch)

  0%|          | 0/997 [00:00<?, ?it/s]

[]

In [71]:
def fetch_downloaded_json_given_url(url):
    name = url_to_name(url)
    with open(f'/Users/dwinston/Downloads/{name}') as f:
        return json.load(f)

In [72]:
import requests
from tqdm.notebook import tqdm

def validate_and_add_parallel(to_fetch):
    nmdc_db = defaultdict(list)
    error_urls = []
    pbar = tqdm(total=len(to_fetch))

    with concurrent.futures.ThreadPoolExecutor() as executor:
        future_to_spec = {
            executor.submit(fetch_downloaded_json_given_url, spec["url"]): spec
            for spec in to_fetch
        }
        for future in concurrent.futures.as_completed(future_to_spec):
            spec = future_to_spec[future]
            try:
                payload = future.result()
            except Exception as e:
                error_urls.append((spec["url"], url_to_name(spec["url"]), str(e)))
            else:
                nmdc_db[spec["type"]].append(payload)
            pbar.update(1)

    pbar.close()
    return nmdc_db, error_urls

In [73]:
nmdc_db, error_urls = validate_and_add_parallel(to_fetch)

  0%|          | 0/997 [00:00<?, ?it/s]

In [74]:
ok = nmdc_jsonschema_validate(nmdc_db)

In [75]:
error_urls

[]

In [76]:
mongo.add_docs(nmdc_db)

{'metabolomics_analysis_activity_set': <pymongo.results.BulkWriteResult at 0x7f8a219a2a40>,
 'nom_analysis_activity_set': <pymongo.results.BulkWriteResult at 0x7f8a21fe9480>}

MetaG annotations (`/global/project/projectdirs/m3408/www/meta/anno2/*_annotations.json`) are 155 JSON files totalling  ~83GB. To load them into MongoDB, I
1. Set up a Globus transfer from NERSC DTN to a Globus Connect Personal endpoint on my laptop. I could e.g.
```
$ scp dtn01.nersc.gov:/global/project/projectdirs/m3408/www/meta/anno2/*_annotations.json .
```
but I chose to use Globus, and it works well.
2. I have a bash script that uses GNU sed to transform each JSON file to a simple JSON Lines file, as expected by `mongoimport`:

```bash
# trim.sh

task(){
    echo $datafile
    gsed -e '1,2d' -e '$d' $datafile | gsed -e '$d' | gsed s/\}\,/\}/ > anno2/$(basename $datafile).jsonl
}

for datafile in ~/globus-nersc/nmdc/m3408/www/meta/anno2/*_annotations.json; do
    task $datafile &
done
```
I use `ps aux | grep "gsed s" | wc -l` to monitor the progress of the parallel sed tasks. I found that trying to do this head/tail file trimming by `json.load`ing the files in Python and resaving was quite slow because the JSON files are individually quite large.


3. I use `jq` to filter for KEGG orthology annotations only, as these are currently the only annotations ingested by the data portal, reducing the number of annotation documents tenfold (from ~500M to ~50M):

```bash
# jqfilter.sh

task(){
    echo $datafile
    jq 'select(.has_function|test("^KEGG."))' $datafile > anno2/$(basename $datafile).filtered.jsonl
}

for datafile in anno2/*_annotations.json.jsonl; do
    task $datafile &
done
```

4. I have a bash script that `mongoimport`s each filtered json lines file to the database

```bash
# mongoimport.sh
# Note: be sure to remove any remote collection indexes first, to speed up import. Then, re-create indexes.

n=$(ls anno2/*_annotations.*filtered.jsonl | wc -l | xargs) # `| xargs` to trim whitespace
i=1
for datafile in anno2/*_annotations.*filtered.jsonl; do
    echo "($i of $n): $datafile"
    mongoimport -h HOST \
        -u USER -p PASS --authenticationDatabase admin \
        -d nmdc -c functional_annotation_set \
        --numInsertionWorkers 8 \
        $datafile
    i=$((i+1))
done
```

specifying multiple (8 in this case) insertion workers per import.

# ghissue_252_253_linked_samples.ipynb
Biosample linking update

In [77]:
rows = []
with open("../src/data/2021-02-03-stegen_biosample_linking_update.csv") as f:
    next(f) # skip header row
    for row in f:
        line = row.strip()
        tokens = line.split(",")
        if tokens[-1] == '':
            rows.append(tokens[:-1])
        else:
            rows.append(tokens)

In [78]:
import json
from pprint import pprint


with open("../src/data/2021-02-04-stegen_biosample_template.json") as f:
    s = f.read()

try:
    stegen_sample_template = json.loads(s)
except json.JSONDecodeError:
    stegen_sample_template = json.loads(
        s.replace('\n', '')\
        .replace("$BIOSAMPLE_ID", '"$BIOSAMPLE_ID"')\
        .replace("“", '"')
    )

In [79]:
import re

gold_pattern = re.compile(r"Gb\d+")

def prefix_sample_id(s):
    if ":" in s:
        return s
    elif re.fullmatch(gold_pattern, s):
        return "gold:" + s
    else:
        return "emsl:" + s

In [80]:
omics = []
for i, row in enumerate(rows):
    omics.append({
        "omics_id": row[0],
        "omics_type": row[1],
        "sample_name": row[2],
        "sample_id": prefix_sample_id(row[3]),
        "new": len(row) > 4 and row[4] == "TRUE"
    })

In [81]:
existing_ids = [
    d["id"] for d in
    db.biosample_set.find({"id": {"$in": [o["sample_id"] for o in omics]}}, ["id"])
]

In [82]:
from toolz import assoc_in, get_in

def transform_in(doc, keys, fn):
    initial = get_in(keys, doc)
    transformed = fn(initial)
    return assoc_in(doc, keys, transformed)

In [83]:
def fill_template(template, sample_id, sample_name):
    doc = assoc_in(template, ["id"], sample_id)
    doc = transform_in(
        doc, ["identifier", "has_raw_value"],
        lambda s: s.replace("$BIOSAMPLE_NAME", sample_name)
    )
    doc = transform_in(
        doc, ["name"],
        lambda s: s.replace("$BIOSAMPLE_NAME", sample_name)
    )
    return doc

In [84]:
# def term_subdocs_to_id_strings(doc):
#     keys_with_term_ids = [
#         k for k in doc
#         if isinstance(doc[k], dict)
#         and "term" in doc[k]
#         and "id" in doc[k]["term"]
#     ]
#     for k in keys_with_term_ids:
#         doc = assoc_in(doc, [k, "term"], doc[k]["term"]["id"])
#     return doc

In [85]:
new_samples = {}
for o in omics:
    if o["new"]:
        new_samples[o["sample_id"]] = o["sample_name"]

docs = []

for sample_id, sample_name in new_samples.items():
    doc = fill_template(stegen_sample_template, sample_id, sample_name)
    #doc = term_subdocs_to_id_strings(doc)
    docs.append(doc)

In [86]:
from toolz import get_in, assoc_in

def un_raw_value(doc, key):
    value = get_in([key, "has_raw_value"], doc)
    if value is not None:
        return assoc_in(doc, [key], value)
    else:
        return doc

def re_raw_value(doc, key):
    value = get_in([key], doc)
    if value is not None and not isinstance(value, dict):
        del doc[key]
        return assoc_in(doc, [key, "has_raw_value"], value)
    else:
        return doc

raws = [
    "ecosystem",
    "collection_date",
    "community",
    "ecosystem_category",
    "ecosystem_subtype",
    "ecosystem_type",
    "geo_loc_name",
    "habitat",
    "identifier",
    "location",
    "ncbi_taxonomy_name",
    "sample_collection_site",
    "specific_ecosystem",
]
timestampvalue_fields = [
    p for p, spec in nmdc_jsonschema['definitions']['Biosample']['properties'].items()
    if '$ref' in spec and spec["$ref"].endswith("TimestampValue")
]
textvalue_fields = [
    p for p, spec in nmdc_jsonschema['definitions']['Biosample']['properties'].items()
    if '$ref' in spec and spec["$ref"].endswith("TextValue")
]

for key in raws:
    docs = [un_raw_value(d, key) for d in docs]
for key in timestampvalue_fields + textvalue_fields:
    docs = [re_raw_value(d, key) for d in docs]

In [87]:
ok = nmdc_jsonschema_validate({"biosample_set": docs})

In [88]:
rv = mongo.add_docs({"biosample_set": docs})

In [89]:
rv['biosample_set'].upserted_count

35

Second checklist item of GH Issue 252

In [90]:
omics = [
    transform_in(o, ["omics_id"], lambda s: "emsl:"+s if ":" not in s else s)
    for o in omics
]

In [91]:
omics_ids = [o["omics_id"] for o in omics]

found_omics_ids = [
    d["id"] for d in
    db.omics_processing_set.find({"id": {"$in": omics_ids}},["id"])
]

In [92]:
assert set(omics_ids) == set(found_omics_ids)

In [93]:
omics_updates = {}
for o in omics:
    omics_updates[o["omics_id"]] = o

In [94]:
from toolz import dissoc

replacing_omics_type = {}

docs = []

for doc in db.omics_processing_set.find({"id": {"$in": omics_ids}}):
    omics_type = get_in(["omics_type"], doc)
    updates = omics_updates[doc["id"]]
    new_omics_type = {"has_raw_value": updates["omics_type"]}
    if omics_type != new_omics_type:
        replacing_omics_type[doc["id"]] = {"from": omics_type, "to": new_omics_type}
    doc = assoc_in(doc, ["omics_type"], new_omics_type)
    doc = assoc_in(doc, ["has_input"], [updates["sample_id"]])
    docs.append(dissoc(doc, "_id"))

In [95]:
replacing_omics_type

{}

In [96]:
ok = nmdc_jsonschema_validate({"omics_processing_set": docs})

In [97]:
rv = mongo.add_docs({"omics_processing_set": docs})

In [98]:
rv['omics_processing_set'].modified_count

434

GH issue 253 - Brodie

In [99]:
rows = []
with open("../src/data/2021-02-04-brodie_biosample_linking_update.csv") as f:
    next(f) # skip header row
    for row in f:
        line = row.strip()
        tokens = line.split(",")
        if tokens[-1] == '':
            rows.append(tokens[:-1])
        else:
            rows.append(tokens)

In [100]:
omics = []
for i, row in enumerate(rows):
    omics.append({
        "omics_id": "emsl:" + row[0].strip(),
        "omics_type": row[1].strip(),
        "sample_name": row[2].strip(),
        "sample_id": "igsn:" + row[3].strip(),
    })

In [101]:
omics_ids = [o["omics_id"] for o in omics]

found_omics_ids = [
    d["id"] for d in
    db.omics_processing_set.find({"id": {"$in": omics_ids}},["id"])
]

In [102]:
assert set(omics_ids) == set(found_omics_ids)

In [103]:
omics_updates = {}
for o in omics:
    omics_updates[o["omics_id"]] = o

In [104]:
replacing_omics_type = {}

docs = []

for doc in db.omics_processing_set.find({"id": {"$in": omics_ids}}):
    omics_type = get_in(["omics_type"], doc)
    updates = omics_updates[doc["id"]]
    new_omics_type = {"has_raw_value": updates["omics_type"]}
    if omics_type != new_omics_type:
        replacing_omics_type[doc["id"]] = {"from": omics_type, "to": new_omics_type}
    doc = assoc_in(doc, ["omics_type"], new_omics_type)
    doc = assoc_in(doc, ["has_input"], [updates["sample_id"]])
    docs.append(dissoc(doc, "_id"))

In [105]:
replacing_omics_type

{}

In [106]:
ok = nmdc_jsonschema_validate({"omics_processing_set": docs})

In [107]:
rv = mongo.add_docs({"omics_processing_set": docs})

In [108]:
rv['omics_processing_set'].modified_count

738

# ghissue_255.ipynb

Remove the 40 Wrighton EMSL omics_processing and data object documents that relate to isolates

In [109]:
mfilter = {
    "part_of": ["gold:Gs0114675"],
    "processing_institution": "Environmental Molecular Sciences Lab"
}

db.omics_processing_set.count_documents(filter=mfilter)

40

In [110]:
from pymongo import DeleteMany
from toolz import concat

requests = []


docs = list(db.omics_processing_set.find(mfilter, ["id", "has_output"]))
omics_processing_ids = [d["id"] for d in docs]
data_object_ids = list(concat(d["has_output"] for d in docs))

assert len(omics_processing_ids) == db.data_object_set.count_documents({"id": {"$in": data_object_ids}})

In [111]:
rv1 = db.omics_processing_set.delete_many({"id": {"$in": omics_processing_ids}})
rv2 = db.data_object_set.delete_many({"id": {"$in": data_object_ids}})

In [112]:
rv1.deleted_count, rv2.deleted_count

(40, 40)

# ghissue_272.ipynb

add 5 Brodie samples used at EMSL but not JGI

In [113]:
from datetime import datetime

dt_pattern = re.compile(r"\d{2}-(?P<month>\w+)-\d{2} \d{2}\.\d{2}\.\d{2}\.(?P<ns>\d+) [A|P]M")
dt_format = "%d-%b-%y %I.%M.%S.%f %p"

def gold_dtstr_to_iso8601(s):
    match = dt_pattern.search(s)
    first, month, rest = s.partition(match.group("month"))
    s_new = first + month[0] + month[1:].lower() + rest
    s_new = s_new.replace(match.group("ns"), match.group("ns")[:-3]) # truncate to microseconds
    dt = datetime.strptime(s_new, dt_format)
    return dt.strftime("%Y-%m-%d")

In [114]:
docs = [
   {
      "name":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States - ",
      "description":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States",
      "lat_lon":{
         "has_raw_value":"38.9206 -106.9489",
         "latitude":38.9206,
         "longitude":-106.9489
      },
      "geo_loc_name":"USA: Colorado",
      "collection_date":"2017-05-09",
      "env_broad_scale":{
         "has_raw_value":"ENVO_00000446",
         "type":"ControlledTermValue"
      },
      "env_local_scale":{
         "has_raw_value":"ENVO_00000292",
         "type":"ControlledTermValue"
      },
      "env_medium":{
         "has_raw_value":"ENVO_00001998",
         "type":"ControlledTermValue"
      },
      "ecosystem":"Environmental",
      "ecosystem_category":"Terrestrial",
      "ecosystem_type":"Soil",
      "ecosystem_subtype":"Unclassified",
      "specific_ecosystem":"Unclassified",
      "depth": {"has_numeric_value": 15},
      "ncbi_taxonomy_name":"soil metagenome",
      "community":"microbial communities",
      "location":"The East River watershed near Crested Butte, Colorado, USA",
      "habitat":"soil",
      "sample_collection_site":"soil",
      "add_date":"22-Jun-18 04.28.47.015000 PM",
      "mod_date":"01-Oct-19 09.41.01.459000 AM",
      "id":"igsn:IEWFS000I",
      "identifier":"igsn:IEWFS000I"
   },
   {
      "name":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States - ",
      "description":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States",
      "lat_lon":{
         "has_raw_value":"38.9206 -106.9489",
         "latitude":38.9206,
         "longitude":-106.9489
      },
      "geo_loc_name":"USA: Colorado",
      "collection_date":"2017-05-09",
      "env_broad_scale":{
         "has_raw_value":"ENVO_00000446",
         "type":"ControlledTermValue"
      },
      "env_local_scale":{
         "has_raw_value":"ENVO_00000292",
         "type":"ControlledTermValue"
      },
      "env_medium":{
         "has_raw_value":"ENVO_00001998",
         "type":"ControlledTermValue"
      },
      "ecosystem":"Environmental",
      "ecosystem_category":"Terrestrial",
      "ecosystem_type":"Soil",
      "ecosystem_subtype":"Unclassified",
      "specific_ecosystem":"Unclassified",
      "depth":{"has_numeric_value": 15},
      "ncbi_taxonomy_name":"soil metagenome",
      "community":"microbial communities",
      "location":"The East River watershed near Crested Butte, Colorado, USA",
      "habitat":"soil",
      "sample_collection_site":"soil",
      "add_date":"22-Jun-18 04.28.47.015000 PM",
      "mod_date":"01-Oct-19 09.41.01.459000 AM",
      "id":"igsn:IEWFS000K",
      "identifier":"igsn:IEWFS000K"
   },
   {
      "name":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States - ",
      "description":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States",
      "lat_lon":{
         "has_raw_value":"38.9206 -106.9489",
         "latitude":38.9206,
         "longitude":-106.9489
      },
      "geo_loc_name":"USA: Colorado",
      "collection_date":"2017-05-09",
      "env_broad_scale":{
         "has_raw_value":"ENVO_00000446",
         "type":"ControlledTermValue"
      },
      "env_local_scale":{
         "has_raw_value":"ENVO_00000292",
         "type":"ControlledTermValue"
      },
      "env_medium":{
         "has_raw_value":"ENVO_00001998",
         "type":"ControlledTermValue"
      },
      "ecosystem":"Environmental",
      "ecosystem_category":"Terrestrial",
      "ecosystem_type":"Soil",
      "ecosystem_subtype":"Unclassified",
      "specific_ecosystem":"Unclassified",
      "depth":{"has_numeric_value": 15},
      "ncbi_taxonomy_name":"soil metagenome",
      "community":"microbial communities",
      "location":"The East River watershed near Crested Butte, Colorado, USA",
      "habitat":"soil",
      "sample_collection_site":"soil",
      "add_date":"22-Jun-18 04.28.47.015000 PM",
      "mod_date":"01-Oct-19 09.41.01.459000 AM",
      "id":"igsn:IEWFS000B",
      "identifier":"igsn:IEWFS000B"
   },
   {
      "name":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States - ",
      "description":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States",
      "lat_lon":{
         "has_raw_value":"38.9206 -106.9489",
         "latitude":38.9206,
         "longitude":-106.9489
      },
      "geo_loc_name":"USA: Colorado",
      "collection_date":"2017-05-09",
      "env_broad_scale":{
         "has_raw_value":"ENVO_00000446",
         "type":"ControlledTermValue"
      },
      "env_local_scale":{
         "has_raw_value":"ENVO_00000292",
         "type":"ControlledTermValue"
      },
      "env_medium":{
         "has_raw_value":"ENVO_00001998",
         "type":"ControlledTermValue"
      },
      "ecosystem":"Environmental",
      "ecosystem_category":"Terrestrial",
      "ecosystem_type":"Soil",
      "ecosystem_subtype":"Unclassified",
      "specific_ecosystem":"Unclassified",
      "depth":{"has_numeric_value": 15},
      "ncbi_taxonomy_name":"soil metagenome",
      "community":"microbial communities",
      "location":"The East River watershed near Crested Butte, Colorado, USA",
      "habitat":"soil",
      "sample_collection_site":"soil",
      "add_date":"22-Jun-18 04.28.47.015000 PM",
      "mod_date":"01-Oct-19 09.41.01.459000 AM",
      "id":"igsn:IEWFS000A",
      "identifier":"igsn:IEWFS000A"
   },
   {
      "name":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States - ",
      "description":"Soil microbial communities from the East River watershed near Crested Butte, Colorado, United States",
      "lat_lon":{
         "has_raw_value":"38.9206 -106.9489",
         "latitude":38.9206,
         "longitude":-106.9489
      },
      "geo_loc_name":"USA: Colorado",
      "collection_date":"2017-05-09",
      "env_broad_scale":{
         "has_raw_value":"ENVO_00000446",
         "type":"ControlledTermValue"
      },
      "env_local_scale":{
         "has_raw_value":"ENVO_00000292",
         "type":"ControlledTermValue"
      },
      "env_medium":{
         "has_raw_value":"ENVO_00001998",
         "type":"ControlledTermValue"
      },
      "ecosystem":"Environmental",
      "ecosystem_category":"Terrestrial",
      "ecosystem_type":"Soil",
      "ecosystem_subtype":"Unclassified",
      "specific_ecosystem":"Unclassified",
      "depth":{"has_numeric_value": 15},
      "ncbi_taxonomy_name":"soil metagenome",
      "community":"microbial communities",
      "location":"The East River watershed near Crested Butte, Colorado, USA",
      "habitat":"soil",
      "sample_collection_site":"soil",
      "add_date":"22-Jun-18 04.28.47.015000 PM",
      "mod_date":"01-Oct-19 09.41.01.459000 AM",
      "id":"igsn:IEWFS000J",
      "identifier":"igsn:IEWFS000J"
   }
]

In [115]:
from toolz import get_in, assoc_in

def re_raw_value(doc, key):
    value = get_in([key], doc)
    if value is not None and not isinstance(value, dict):
        del doc[key]
        return assoc_in(doc, [key, "has_raw_value"], value)
    else:
        return doc

timestampvalue_fields = [
    p for p, spec in nmdc_jsonschema['definitions']['Biosample']['properties'].items()
    if '$ref' in spec and spec["$ref"].endswith("TimestampValue")
]
textvalue_fields = [
    p for p, spec in nmdc_jsonschema['definitions']['Biosample']['properties'].items()
    if '$ref' in spec and spec["$ref"].endswith("TextValue")
]
quantityvalue_fields = [
    p for p, spec in nmdc_jsonschema['definitions']['Biosample']['properties'].items()
    if '$ref' in spec and spec["$ref"].endswith("QuantityValue")
]

for key in timestampvalue_fields + textvalue_fields + quantityvalue_fields:
    docs = [re_raw_value(d, key) for d in docs]

In [116]:
for d in docs:
    d["add_date"] = gold_dtstr_to_iso8601(d["add_date"])
    d["mod_date"] = gold_dtstr_to_iso8601(d["mod_date"])

In [117]:
ok = nmdc_jsonschema_validate({"biosample_set": docs})

In [119]:
rv = mongo.add_docs({"biosample_set": docs})

In [120]:
assert db.biosample_set.count_documents({"id": {"$in": [d["id"] for d in docs]}}) == len(docs)

# ensure_biosample_set_study_id.ipynb

In [121]:
# Done via biosample_set.part_of field ("Relates the biosample to the study for which the sample was collected.").

In [122]:
db.biosample_set.create_index("part_of")

'part_of_1'

In [123]:
stegen = "gold:Gs0114663"
wrighton = "gold:Gs0114675"
brodie = "gold:Gs0135149"

In [124]:
db.biosample_set.count_documents({"part_of": brodie})

48

In [125]:
db.biosample_set.count_documents({"part_of": stegen})

50

In [126]:
db.biosample_set.count_documents({"part_of": wrighton})

25

In [127]:
db.biosample_set.count_documents({"part_of": {"$in": [brodie, stegen, wrighton]}})

123

# Remove omics_processing without has_input

In [128]:
to_delete = [d["id"] for d in db.omics_processing_set.find({"has_input": "emsl:TBD"}, ["id"])]
print(len(to_delete))

1336


In [129]:
from pymongo import DeleteMany

rv = db.omics_processing_set.bulk_write([DeleteMany({"id": {"$in": to_delete}})])

In [130]:
rv.deleted_count

1336

# add metaT files

In [131]:
from collections import defaultdict
import json
from pathlib import Path
import re

import requests

from nmdc_runtime.util import nmdc_jsonschema_validate

pattern = re.compile(r"https?://(?P<domain>[^/]+)/(?P<path>.+)")

to_fetch = [{
    "url": "https://portal.nersc.gov/project/m3408/meta/metaT_activity.json",
    "type": "metatranscriptome_activity_set" # waiting on PR microbiomedata/nmdc-schema#86
}, {
    "url": "https://portal.nersc.gov/project/m3408/meta/metaT_data_objects.json",
    "type": "data_object_set" # already mongoimported, but good to re-do via notebook
}]

def url_to_name(url):
    m = pattern.match(url)
    return f"{'.'.join(reversed(m.group('domain').split('.')))}__{m.group('path').replace('/', '.')}"

def download_them_all(to_fetch):
    for i, spec in enumerate(to_fetch):
        url = spec["url"]
        name = url_to_name(url)
        print(f"{i+1}/{len(to_fetch)}: fetching {url}")
        rv = requests.get(url)
        print(f"saving as {name}")
        with open(f'/Users/dwinston/Downloads/{name}', 'w') as f:
            json.dump(rv.json(), f)
            
def fetch_downloaded_json(name):
    with open(f'/Users/dwinston/Downloads/{name}') as f:
        return json.load(f)
    
def fetch_downloaded_json_given_url(url):
    name = url_to_name(url)
    with open(f'/Users/dwinston/Downloads/{name}') as f:
        return json.load(f)

def fetch_validate_and_add(to_fetch):
    to_add = defaultdict(list)
    for i, spec in enumerate(to_fetch):
        url = spec["url"]
        name = Path(url).name
        collection_name = spec["type"]
        print(f"{i+1}/{len(to_fetch)}: fetching {name} ({collection_name})")
        docs = fetch_downloaded_json_given_url(url)
        if not isinstance(docs, list):
            docs = [docs]
        to_add[collection_name].extend(docs)
    print("validating")
    nmdc_jsonschema_validate(to_add)
    print("adding")
    mongo.add_docs(to_add, validate=False)
            
download_them_all(to_fetch)
fetch_validate_and_add(to_fetch)

1/2: fetching https://portal.nersc.gov/project/m3408/meta/metaT_activity.json
saving as gov.nersc.portal__project.m3408.meta.metaT_activity.json
2/2: fetching https://portal.nersc.gov/project/m3408/meta/metaT_data_objects.json
saving as gov.nersc.portal__project.m3408.meta.metaT_data_objects.json
1/2: fetching metaT_activity.json (metatranscriptome_activity_set)
2/2: fetching metaT_data_objects.json (data_object_set)
validating
adding


# Create file_type_enum collection
May end up being the same as `object_types`, but keep seperate for now

In [132]:
from pydantic import BaseModel

from nmdc_runtime.api.core.idgen import Base32Id, generate_id_unique


class FileTypeEnumBase(BaseModel):
    name: str
    description: str
    filter: str # JSON-encoded data_object_set mongo collection filter document    

class FileTypeEnum(FileTypeEnumBase):
    id: Base32Id

In [133]:
from functools import lru_cache

@lru_cache
def _fte_id(fte_as_str):
    return generate_id_unique(mongo.db, "file_type_enum")

def get_fte_id(fte):
    return _fte_id(fte.json())

In [134]:
def filter_matches(filter_):
    return list(mongo.db.data_object_set.find(filter_).limit(3))

In [135]:
file_type_enum = [
    FileTypeEnumBase(
        name="FT ICR-MS analysis results",
        description="FT ICR-MS-based metabolite assignment results table",
        filter=json.dumps({"url": {"$regex": "nom\/results"}, "description": {"$regex": "FT ICR-MS"}})
    ),
    FileTypeEnumBase(
        name="GC-MS Metabolomics Results",
        description="GC-MS-based metabolite assignment results table",
        filter=json.dumps({"url": {"$regex": "metabolomics\/results"}})
    ),
    FileTypeEnumBase(
        name="Metaproteomics Workflow Statistics",
        description="Aggregate workflow statistics file",
        filter=json.dumps({"url": {"$regex": "QC_Metrics.tsv"}})
    ),
    FileTypeEnumBase(
        name="Protein Report",
        description="Filtered protein report file",
        filter=json.dumps({"url": {"$regex": "Protein_Report.tsv"}})
    ),
    FileTypeEnumBase(
        name="Peptide Report",
        description="Filtered peptide report file",
        filter=json.dumps({"url": {"$regex": "Peptide_Report.tsv"}})
    ),
    FileTypeEnumBase(
        name="Unfiltered Metaproteomics Results",
        description="MSGFjobs and MASIC output file",
        filter=json.dumps({"url": {"$regex": "MSGFjobs_MASIC_resultant.tsv"}})
    ),
    FileTypeEnumBase(
        name="Read Count and RPKM",
        description="Annotation read count and RPKM per feature JSON",
        filter=json.dumps({"url": {"$regex": "metat_out_json\/output.json"}})
    ),
    FileTypeEnumBase(
        name="QC non-rRNA R2",
        description="QC removed rRNA reads (R2) fastq",
        filter=json.dumps({"url": {"$regex": "filtered_R2.fastq"}})
    ),
    FileTypeEnumBase(
        name="QC non-rRNA R1",
        description="QC removed rRNA reads (R1) fastq",
        filter=json.dumps({"url": {"$regex": "filtered_R1.fastq"}})
    ),
    FileTypeEnumBase(
        name="Metagenome Bins",
        description="Metagenome bin contigs fasta",
        filter=json.dumps({"url": {"$regex": "bins\.\d+\.fa"}})
    ),
    FileTypeEnumBase(
        name="CheckM Statistics",
        description="CheckM statistics report",
        filter=json.dumps({"url": {"$regex": "checkm_qa.out"}})
    ),    
    FileTypeEnumBase(
        name="Krona Plot",
        description="[GOTTCHA2] krona plot HTML file",
        filter=json.dumps({"url": {"$regex": "gottcha2.*krona.html"}})
    ),
    FileTypeEnumBase(
        name="Krona Plot",
        description="[Kraken2] krona plot HTML file",
        filter=json.dumps({"url": {"$regex": "kraken2.*krona.html"}})
    ),
    FileTypeEnumBase(
        name="Classification Report",
        description="[Kraken2] output report file",
        filter=json.dumps({"url": {"$regex": "kraken2.*report.tsv"}})
    ),    
    FileTypeEnumBase(
        name="Taxonomic Classification",
        description="[Kraken2] output read classification file",
        filter=json.dumps({"url": {"$regex": "kraken2.*classification.tsv"}})
    ),    
    FileTypeEnumBase(
        name="Krona Plot",
        description="[Centrifuge] krona plot HTML file",
        filter=json.dumps({"url": {"$regex": "centrifuge.*krona.html"}})
    ),    
    FileTypeEnumBase(
        name="Classification Report",
        description="[Centrifuge] output report file",
        filter=json.dumps({"url": {"$regex": "centrifuge.*report.tsv"}})
    ),    
    FileTypeEnumBase(
        name="Taxonomic Classification",
        description="[Centrifuge] output read classification file",
        filter=json.dumps({"url": {"$regex": "centrifuge.*classification.tsv"}})
    ),    
    FileTypeEnumBase(
        name="Structural Annotation GFF",
        description="GFF3 format file with structural annotations",
        filter=json.dumps({"url": {"$regex": "annotation\/.*structural_annotation\.gff"}})
    ),    
    FileTypeEnumBase(
        name="Functional Annotation GFF",
        description="GFF3 format file with functional annotations",
        filter=json.dumps({"url": {"$regex": "annotation\/.*functional_annotation\.gff"}})
    ),   
    FileTypeEnumBase(
        name="Annotation Amino Acid FASTA",
        description="FASTA amino acid file for annotated proteins",
        filter=json.dumps({"url": {"$regex": "annotation.*\.faa"}})
    ),    
    FileTypeEnumBase(
        name="Annotation Enzyme Commission",
        description="Tab delimited file for EC annotation",
        filter=json.dumps({"url": {"$regex": "_ec.tsv"}})
    ),    
    FileTypeEnumBase(
        name="Annotation KEGG Orthology",
        description="Tab delimited file for KO annotation",
        filter=json.dumps({"url": {"$regex": "_ko.tsv"}})
    ),      
    FileTypeEnumBase(
        name="Assembly Coverage BAM",
        description="Sorted bam file of reads mapping back to the final assembly",
        filter=json.dumps({"url": {"$regex": "pairedMapped_sorted.bam"}})
    ),       
    FileTypeEnumBase(
        name="Assembly AGP",
        description="An AGP format file describes the assembly",
        filter=json.dumps({"url": {"$regex": "assembly.agp"}})
    ),       
    FileTypeEnumBase(
        name="Assembly Scaffolds",
        description="Final assembly scaffolds fasta",
        filter=json.dumps({"url": {"$regex": "assembly_scaffolds.fna"}})
    ),      
    FileTypeEnumBase(
        name="Assembly Contigs",
        description="Final assembly contigs fasta",
        filter=json.dumps({"url": {"$regex": "assembly_contigs.fna"}})
    ),    
    FileTypeEnumBase(
        name="Assembly Coverage Stats",
        description="Assembled contigs coverage information",
        filter=json.dumps({"url": {"$regex": "mapping_stats.txt"}})
    ),
    FileTypeEnumBase(
        name="Filtered Sequencing Reads",
        description="Reads QC result fastq (clean data)",
        filter=json.dumps({"url": {"$regex": "filtered.fastq.gz"}})
    ),
    FileTypeEnumBase(
        name="QC Statistics",
        description="Reads QC summary statistics",
        filter=json.dumps({"url": {"$regex": "filterStats.txt"}})
    ),
]

file_type_enum = [
    FileTypeEnum(id=get_fte_id(fte), **fte.dict())
    for fte in file_type_enum
]

In [136]:
db.drop_collection("file_type_enum")
db.file_type_enum.create_index("id", unique=True)
rv = db.file_type_enum.insert_many([fte.dict() for fte in file_type_enum])
len(rv.inserted_ids)

30

In [137]:
def fte_matches(fte):
    return [dissoc(d, "_id") for d in mongo.db.data_object_set.find(json.loads(fte.filter))]

In [138]:
from pymongo import UpdateOne

for doc in db.file_type_enum.find():
    fte = FileTypeEnum(**doc)
    print(fte.id, fte.name)
    docs = fte_matches(fte)
    if docs:
        for doc in docs:
            doc["data_object_type"] = fte.id
        mongo.add_docs({"data_object_set": docs})
    else:
        print(f"no docs matching {fte.dict()}")

aaa1-tye4-03 FT ICR-MS analysis results
37my-19jk-66 GC-MS Metabolomics Results
xctf-zdw2-22 Metaproteomics Workflow Statistics
2ydr-9qqq-53 Protein Report
e8y8-pvrd-95 Peptide Report
rd87-fv8m-80 Unfiltered Metaproteomics Results
1gqm-wmyp-53 Read Count and RPKM
fwyp-8w70-07 QC non-rRNA R2
hzt1-y5fx-18 QC non-rRNA R1
bm1x-vd6b-06 Metagenome Bins
b575-qc77-75 CheckM Statistics
gch0-zrdx-65 Krona Plot
ayxv-k605-95 Krona Plot
t278-2zf9-95 Classification Report
x534-570e-85 Taxonomic Classification
q043-5qe3-92 Krona Plot
h3zm-dkwj-17 Classification Report
dn7v-gy46-08 Taxonomic Classification
0a1f-q961-86 Structural Annotation GFF
y4km-an75-98 Functional Annotation GFF
fytr-5eb4-04 Annotation Amino Acid FASTA
8f0h-0r37-97 Annotation Enzyme Commission
9yew-qqae-18 Annotation KEGG Orthology
2tk7-fhj6-96 Assembly Coverage BAM
0hna-73pd-79 Assembly AGP
pmg5-4gds-98 Assembly Scaffolds
em2g-mcwt-09 Assembly Contigs
1f4g-xb1f-62 Assembly Coverage Stats
yhcj-ech0-19 Filtered Sequencing Reads
dd2

# Take bins.\d.fa and change name to {gold ID (end of description)}.bins.\d.fa

In [139]:
fte = FileTypeEnum(**db.file_type_enum.find_one({"name": "Metagenome Bins"}))

docs = [dissoc(d, "_id") for d in db.data_object_set.find(json.loads(fte.filter))]

In [140]:
from pathlib import Path

for d in docs:
    _, _, gold_id = d["description"].rpartition(" ")
    name = Path(d["url"]).name
    d["name"] = f"{gold_id}.{name}"

_ = mongo.add_docs({"data_object_set": docs})
docs[0]

{'id': 'nmdc:e0b7421514f976cb7ad8c343cf3077a9',
 'name': 'gold:Gp0115663.bins.3.fa',
 'description': 'metabat2 binned contig file for gold:Gp0115663',
 'file_size_bytes': 288873,
 'url': 'https://data.microbiomedata.org/data/1781_86101/img_MAGs/metabat-bins/bins.3.fa',
 'type': 'nmdc:DataObject',
 'data_object_type': 'bm1x-vd6b-06'}

# add profile images
https://portal.nersc.gov/project/m3408/profile_images/

In [141]:
# study -> principal_investigator -> profile_image_url

In [142]:
import requests
from toolz import dissoc

base_url = "https://portal.nersc.gov/project/m3408/profile_images/"

docs = [dissoc(d, "_id") for d in db.study_set.find()]
for doc in docs:
    pi = doc["principal_investigator"]
    pi_name = pi["has_raw_value"]
    image_name = "_".join(reversed(pi_name.lower().split(" "))) + '.jpg'
    url = base_url + image_name
    if requests.head(url).status_code == 200:
        doc["principal_investigator"]["profile_image_url"] = url
    else:
        print(pi_name, image_name, requests.head(base_url + image_name).status_code)

Dale Pelletier pelletier_dale.jpg 404
Erik Lilleskov lilleskov_erik.jpg 404


In [143]:
ok = nmdc_jsonschema_validate({"study_set": docs})

In [144]:
rv = mongo.add_docs({"study_set": docs})

In [145]:
db.study_set.count_documents({"principal_investigator.profile_image_url": {"$exists": True}})

6

# Correct data_object_set.file_size_bytes

In [146]:
from toolz import dissoc

docs = [dissoc(d, "_id") for d in db.data_object_set.find({"url": {"$exists": True}})]

In [147]:
# do_ids = []

# with open("/Users/dwinston/Downloads/nmdc-data-objects-wrong-size.txt") as f:
#     for line in f:
#         do_id, url = line.strip().split(",")
#         do_ids.append(do_id)
        
# print(f"{len(docs)} listed")

# docs = [dissoc(d, "_id") for d in db.data_object_set.find({"id": {"$in": do_ids}})]
# print(f"{len(docs)} found")

In [148]:
import urllib3

import requests
from toolz import assoc

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def get_file_size_bytes(d, header_ok=True):
    url = d["url"].replace("https://", "http://")
    try:
        rv = requests.head(
            url, allow_redirects=True, verify=False, timeout=5, headers={"Accept-Encoding": "gzip;q=0"}
        )
        if not rv.status_code == 200:
            return {"no_ok_response": [d["id"]]}

        if header_ok:
            try:
                return {"data": [assoc(d, 'file_size_bytes', int(rv.headers['Content-Length']))]}
            except KeyError:
                pass
        
        #rv = requests.get(url, allow_redirects=True, verify=False, timeout=0.5)
        #return {"data": [assoc(d, 'file_size_bytes', len(rv.content))]}
        return {"no_header_content_length": [d["id"]]}
    except Exception as e:
        return {"error": [(d, str(e))]}

In [149]:
# docs = [d["error"][0][0] for d in db["_tmp__get_file_size_bytes"].find({"error": {"$exists": True}})]
# len(docs)

In [152]:
import concurrent.futures

from toolz import concat, merge_with
from tqdm.notebook import tqdm

db.drop_collection("_tmp__get_file_size_bytes")
error_docs = []

pbar = tqdm(total=len(docs))

with concurrent.futures.ThreadPoolExecutor() as executor:
    future_to_doc = {
        executor.submit(
            get_file_size_bytes, doc
        ): doc
        for doc in docs
    }
    print("created futures...")
    for future in concurrent.futures.as_completed(future_to_doc):
        pbar.update(1)
        doc = future_to_doc[future]
        try:
            payload = future.result()
        except Exception as e:
            error_docs.append(doc, str(e))
        else:
            db["_tmp__get_file_size_bytes"].insert_one(payload)

pbar.close()

  0%|          | 0/9668 [00:00<?, ?it/s]

created futures...


In [153]:
results = [dissoc(d, "_id") for d in db["_tmp__get_file_size_bytes"].find()]
results = merge_with(lambda v: list(concat(v)), *results)

In [154]:
rv = mongo.add_docs({"data_object_set": results["data"]})

In [155]:
len(error_docs)

0

# load genome_feature_set

MetaG features (`/global/project/projectdirs/m3408/www/meta/anno2/*_features.json`) are 155 JSON files totalling  ~26GB. To load them into MongoDB, I
1. Set up a Globus transfer from NERSC DTN to a Globus Connect Personal endpoint on my laptop. I could e.g.
```
$ scp dtn01.nersc.gov:/global/project/projectdirs/m3408/www/meta/anno2/*_features.json .
```
but I chose to use Globus, and it works well.
2. I have a bash script that uses GNU sed to transform each JSON file to a simple JSON Lines file, as expected by `mongoimport`:

```bash
# trim.sh

task(){
    echo $datafile
    # Delete n lines from end *in addition to* last line.
    # Example: n=1 deletes the last line as well as 1 line prior.
    n=1
    gsed -e '1,2d' -e "$(( $(wc -l < $datafile)-n+1 )),$ d" -e 's/\}\,/\}/' $datafile > $(basename $datafile).jsonl
}

for datafile in ~/globus-nersc/nmdc/m3408/www/meta/anno2/features/*_features.json; do
    task $datafile &
done
```
I use `ps aux | grep "gsed " | wc -l` to monitor the progress of the parallel sed tasks. I found that trying to do this head/tail file trimming by `json.load`ing the files in Python and resaving was quite slow because the JSON files are individually quite large.


2. I have a bash script that `mongoimport`s each filtered json lines file to the database

```bash
# mongoimport.sh
# Note: be sure to remove any remote collection indexes first, to speed up import. Then, re-create indexes.

n=$(ls *features*.jsonl | wc -l | xargs) # `| xargs` to trim whitespace
i=1
for datafile in *features*.jsonl; do
    echo "($i of $n): $datafile"
    mongoimport -h mongo-ext.nmdc-runtime-dev.polyneme.xyz \
        -u donny -p "_4tEk6z2YrRYTr@" --authenticationDatabase admin \
        -d nmdc -c genome_feature_set \
        --numInsertionWorkers 8 \
        $datafile
    i=$((i+1))
done
```

specifying multiple (8 in this case) insertion workers per import.

# individual study doc updates (via spreadsheet spec)

"June Sprint post-ETL updates" sheet in NMDC Google Drive

- many of the study pages are missing information that doesn't yet exist in the mongo database.

In [156]:
stegen_study_id = "gold:Gs0114663"
wrighton_study_id = "gold:Gs0114675"
brodie_study_id = "gold:Gs0135149"
bioscales_study_id = "gold:Gs0154044"
microbes_persist_sfa_study_id = "gold:Gs0128850"
plant_microbe_interfaces_sfa_study_id = "gold:Gs0103573"
spruce_study_id = "gold:Gs0110138"
watershed_sfa_study_id = "gold:Gs0149986"

In [157]:
from pprint import pprint

from nmdc_runtime.api.core.util import pick

def study_summary(doc):
    return pick(["id", "principal_investigator", "name", "publications", "websites"], doc)

In [158]:
doc = db.study_set.find_one({"id": watershed_sfa_study_id})
pprint(study_summary(doc))

{'id': 'gold:Gs0149986',
 'name': 'Soil microbial communities from watershed of Upper East River, '
         'Colorado, USA',
 'principal_investigator': {'has_raw_value': 'Eoin Brodie',
                            'profile_image_url': 'https://portal.nersc.gov/project/m3408/profile_images/brodie_eoin.jpg'}}


In [159]:
commands = [{
    # row 6
    "update": "study_set",
    "updates": [{
        "q": {
            "id": stegen_study_id
        },
        "u": {
            "$addToSet": {
                "publications": "https://doi.org/10.1371/journal.pone.0228165"
            }
        },
    }],
    "comment": "add to studies/stegen/publications"    
}, {
    # row 7
    "update": "study_set",
    "updates": [{
        "q": {
            "id": stegen_study_id
        },
        "u": {
            "$set": {
                "description": """\
This research project, led by James Stegen at PNNL, aimed to understand how molecular-scale processes govern the biogeochemical function of subsurface groundwater-surface water mixing zones (i.e., the hyporheic zone). This project was conducted along the Columbia River in Eastern Washington State, which exhibits variation in microbiome composition, biogeochemical activity, and substrate biogeochemistry, making it an ideal environment for studying biogeochemical hotspots. To capture a range of biogeochemical activities, samples were collected from areas with dense vegetation and virtually no vegetation.

This project’s long-term goal is to develop models that can simulate impacts of disturbance on river corridor hydro-biogeochemistry by understanding fundamental molecular processes that lead to emergent function. This project is part of PNNL’s River Corridor Hydrobiogeochemistry Science Focus Area (https://www.pnnl.gov/projects/river-corridor-hydrobiogeochemistry-science-focus-area).
"""
            }
        },
    }],
    "comment": "replace studies/stegen/description"
}, {
    # row 8
    "update": "study_set",
    "updates": [{
        "q": {
            "id": wrighton_study_id
        },
        "u": {
            "$set": {
                "description": """\
This project aims to improve the understanding of microbial diversity and metabolism in deep shale, with implications for novel enzyme discovery and energy development. This project was conducted along two Appalachian basin shales, the Marcellus and Utica/Point Pleasant formations in Pennsylvania and Ohio, respectively. Samples were collected from input and produced fluids up to a year after hydraulic fracturing at varying depths and locations (4 wells, 2 basin shales).
"""
            }
        },
    }],
    "comment": "replace studies/wrighton/description"
}, {
    # row 9
    "update": "study_set",
    "updates": [{
        "q": {
            "id": brodie_study_id
        },
        "u": {
            "$addToSet": {
                "publications": "https://doi.org/10.21952/WTR/1573029"
            }
        },
    }],
    "comment": "add to studies/brodie/publications"
}, {
    # row 10
    "update": "study_set",
    "updates": [{
        "q": {
            "id": brodie_study_id
        },
        "u": {
            "$set": {
                "description": """\
This research project aimed to understand how snow accumulation and snowmelt influences the mobilization of nitrogen through the soil microbiome in a mountainous catchment at the East River Watershed in Colorado. This project sought to identify bacteria, archaea, and fungi that were associated with the microbial biomass bloom that occurs during winter and the biomass crash following snowmelt. This project also sought to understand whether the traits that govern microbial community assembly during and after snowmelt were phylogenetically conserved. Samples were collected during winter, the snowmelt period, and after snowmelt in spring, from an area that transitioned from an upland hillslope to a riparian floodplain.

This project is part of the Watershed Function Science Focus Area: https://watershed.lbl.gov/.
"""
            }
        },
    }],
    "comment": "replace studies/brodie/description"
}, {
    # row 11
    "update": "study_set",
    "updates": [{
        "q": {
            "id": bioscales_study_id
        },
        "u": {
            "$set": {
                "description": """\
The goal of this Bio-Scales Pilot Project study is to understand how plant traits modify the microbiome and in particular how the coupled plant-soil-microbial system influences nitrogen transformation patterns and fluxes.
"""
            }
        },
    }],
    "comment": "replace studies/bioscales/description"
}, {
    # row 12
    "update": "study_set",
    "updates": [{
        "q": {
            "id": bioscales_study_id
        },
        "u": {
            "$addToSet": {
                "websites": {
                    "$each": [
                        "https://www.ornl.gov/staff-profile/mitchel-j-doktycz",
                        "https://www.ornl.gov/section/bioimaging-and-analytics",
                        "https://pmiweb.ornl.gov/",
                        "https://www.ornl.gov/project/bio-scales",
                    ]
                }
            }
        },
    }],
    "comment": "add to studies/bioscales/websites"
}, {
    # row 14 (row 13 done elsewhere)
    "update": "study_set",
    "updates": [{
        "q": {
            "id": bioscales_study_id
        },
        "u": {
            "$set": {
                "principal_investigator.has_raw_value": "Mitchel J. Doktycz"
            }
        },
    }],
    "comment": "replace studies/bioscales/principal_investigator name"
}, {
    # row 15
    "update": "study_set",
    "updates": [{
        "q": {
            "id": microbes_persist_sfa_study_id
        },
        "u": {
            "$addToSet": {
                "websites": "https://sc-programs.llnl.gov/biological-and-environmental-research-at-llnl/soil-microbiome"
            }
        },
    }],
    "comment": "add to studies/microbes_persist_sfa/websites"
}, {
    # row 16
    "update": "study_set",
    "updates": [{
        "q": {
            "id": microbes_persist_sfa_study_id
        },
        "u": {
            "$set": {
                "description": """\
The Microbes Persist: Systems Biology of the Soil Microbiome SFA seeks to understand how microbial ecophysiology, population dynamics, and microbe–mineral–organic matter interactions regulate the persistence of microbial residues in soil under changing moisture regimes.
"""
            }
        },
    }],
    "comment": "replace studies/microbes_persist_sfa/description"
}, {
    # row 19 (rows 17 and 18 done elsewhere)
    "update": "study_set",
    "updates": [{
        "q": {
            "id": plant_microbe_interfaces_sfa_study_id
        },
        "u": {
            "$addToSet": {
                "websites": "https://pmiweb.ornl.gov/pmi-project-aims/"
            }
        },
    }],
    "comment": "add to studies/plant_microbe_interfaces_sfa/websites"
}, {
    # row 20
    "update": "study_set",
    "updates": [{
        "q": {
            "id": plant_microbe_interfaces_sfa_study_id
        },
        "u": {
            "$set": {
                "description": """\
The goal of the Plant-Microbe Interfaces SFA is to gain a deeper understanding of the diversity and functioning of mutually beneficial interactions between plants and microbes in the rhizosphere.
"""
            }
        },
    }],
    "comment": "set studies/plant_microbe_interfaces_sfa/description"
}, {
    # row 21
    "update": "study_set",
    "updates": [{
        "q": {
            "id": spruce_study_id
        },
        "u": {
            "$addToSet": {
                "websites": "https://mnspruce.ornl.gov/project/overview"
            }
        },
    }],
    "comment": "add to studies/spruce/webites"
}, {
    # row 22
    "update": "study_set",
    "updates": [{
        "q": {
            "id": spruce_study_id
        },
        "u": {
            "$set": {
                "description": """\
The Spruce and Peatland Responses Under Changing Environments (SPRUCE) experiment is the primary component of the Terrestrial Ecosystem Science Scientific Focus Area of ORNL's Climate Change Program, focused on terrestrial ecosystems and the mechanisms that underlie their responses to climatic change. This project seeks to assess the response of northern peatland ecosystems to increases in temperature and exposures to elevated atmospheric CO2 concentrations.
"""
            }
        },
    }],
    "comment": "set studies/spruce/description"
}, {
    # row 23
    "update": "study_set",
    "updates": [{
        "q": {
            "id": watershed_sfa_study_id
        },
        "u": {
            "$addToSet": {
                "websites": "https://watershed.lbl.gov/about/"
            }
        },
    }],
    "comment": "add to studies/watershed_sfa/webites"
}, {
    # row 24
    "update": "study_set",
    "updates": [{
        "q": {
            "id": watershed_sfa_study_id
        },
        "u": {
            "$set": {
                "description": """\
The Watershed Function Scientific SFA is developing a predictive understanding of how mountainous watersheds retain and release water, nutrients, carbon, and metals. In particular, the SFA is developing understanding and tools to measure and predict how droughts, early snowmelt, and other perturbations impact downstream water availability and biogeochemical cycling at episodic to decadal timescales.
"""
            }
        },
    }],
    "comment": "set studies/watershed_sfa/description"
}]

In [160]:
docs = [dissoc(d, "_id") for d in db.study_set.find()]

In [161]:
tmp_coll = "_tmp_study_set"
db.drop_collection(tmp_coll)
db[tmp_coll].insert_many(docs)
db[tmp_coll].create_index("id", unique=True)

'id_1'

In [162]:
from toolz import assoc

_tmp_commands = [assoc(c, 'update', tmp_coll) for c in commands]

In [163]:
rvs = []
for c in _tmp_commands:
    rvs.append(db.command(c))
rvs

[{'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0}]

In [164]:
docs_to_validate = [dissoc(d, "_id") for d in db[tmp_coll].find()]

In [165]:
ok = nmdc_jsonschema_validate({"study_set": docs_to_validate})

In [166]:
rv = mongo.add_docs({"study_set": docs_to_validate})

In [167]:
rv["study_set"].modified_count

8

In [168]:
db.drop_collection(tmp_coll)

{'nIndexesWas': 2, 'ns': 'nmdc._tmp_study_set', 'ok': 1.0}

# study metadata updates redux

In [169]:
import csv

rows = []
with open("../src/data/2021-07-02-study-changes.csv") as f:
    reader = csv.DictReader(f)
    include = False
    for row in reader:
        if row['Term'] == 'study title':
            include = True
        if include:
            rows.append({
                'name': row["Study"].lower().replace(' ','_').replace('-','_'),
                'id': f'gold:{row["GOLD Study ID"]}',
                'field': re.findall(r"\w+", row["Term"])[-1].lower(),
                'value': row['Value'].strip(),
            })

In [170]:
{r['field'] for r in rows}

{'citation', 'description', 'publication', 'title'}

In [171]:
def urlify(doi):
    if not doi.startswith("http"):
        suffix = doi.split(":", maxsplit=1)[-1]
        return f"https://doi.org/{suffix}"
    return doi

In [172]:
commands = []

for row in rows:
    c = {
        "update": "study_set",
        "updates": [{
            "q": {"id": row["id"]},
            "u": {}
        }],
    }
    if row["field"] in {"title", "description"}:
        c["updates"][0]["u"] = {"$set": {row["field"]: row["value"]}}
        c["comment"] = f'set studies/{row["name"]}/{row["field"]}'
    elif row["field"] == "citation":
        c["updates"][0]["u"] = {"$set": {"doi": {"has_raw_value": urlify(row["value"])}}}
        c["comment"] = f'set studies/{row["name"]}/doi'
    elif row["field"] == "publication":
        c["updates"][0]["u"] = {"$addToSet": {"publications": urlify(row["value"])}}
        c["comment"] = f'add studies/{row["name"]}/{row["field"]}'
    else:
        print("Unknown field", row["field"])
    commands.append(c)

In [173]:
from toolz import assoc, dissoc

docs = [dissoc(d, "_id") for d in db.study_set.find()]
tmp_coll = "_tmp_study_set"
db.drop_collection(tmp_coll)
db[tmp_coll].insert_many(docs)
db[tmp_coll].create_index("id", unique=True)
_tmp_commands = [assoc(c, 'update', tmp_coll) for c in commands]
rvs = []
for c in _tmp_commands:
    rvs.append(db.command(c))
rvs

[{'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0},
 {'n': 1, 'nModified': 1, 'ok': 1.0}]

In [174]:
docs_to_validate = [dissoc(d, "_id") for d in db[tmp_coll].find()]
ok = nmdc_jsonschema_validate({"study_set": docs_to_validate})
rv = mongo.add_docs({"study_set": docs_to_validate})
print(rv["study_set"].modified_count)
db.drop_collection(tmp_coll)

8


{'nIndexesWas': 2, 'ns': 'nmdc._tmp_study_set', 'ok': 1.0}

In [175]:
# TODO get local files to api. Untested

from dagster import build_solid_context
from nmdc_runtime.solids.core import local_file_to_api_object as lftao

context = build_solid_context(
    resources={"mongo": mongo, "runtime_api_site_client": client}
)

storage_path = "../src/data/2021-07-02-study-changes.csv"


def local_file_to_api_object(file_info):
    return lftao(context, file_info)

#obj = local_file_to_api_object({"storage_path": storage_path, "mime_type": 'text/csv'})

#doc = db.objects.find_one({"id": obj["id"]})
#assert doc["name"] == Path(storage_path).name

# ensure all study_set.doi values are HTTPS URIs

In [176]:
from toolz import assoc_in, dissoc

docs = [dissoc(d, "_id") for d in db.study_set.find()]

In [177]:
docs = [assoc_in(d, ["doi", "has_raw_value"], urlify(d["doi"]["has_raw_value"])) for d in docs]

In [178]:
ok = nmdc_jsonschema_validate({"study_set": docs})

In [179]:
mongo.add_docs({"study_set": docs})

{'study_set': <pymongo.results.BulkWriteResult at 0x7f89e0c53680>}

# verify study sample counts
From GOLD db

In [181]:
assert db.biosample_set.count_documents({"part_of": "gold:Gs0154044"}) == 217

# embargo studies
Embargo study `gold:Gs0149986`

In [260]:
study_id = "gold:Gs0149986"

In [261]:
ids_biosamples = [d["id"]for d in db.biosample_set.find({"part_of": study_id})]

In [262]:
len(ids_biosamples)

246

In [263]:
ids_omics_processings_via_study = [d["id"] for d in db.omics_processing_set.find({"part_of": study_id})]

In [264]:
len(ids_omics_processings_via_study)

0

In [265]:
ids_omics_processings_via_biosamples = [
    d["id"] for d in db.omics_processing_set.find({"has_input": {"$in": ids_biosamples}})
]

In [266]:
len(ids_omics_processings_via_biosamples)

0

In [267]:
ids_omics_processings = list(set(ids_omics_processings_via_study) | set(ids_omics_processings_via_biosamples))

In [268]:
len(ids_omics_processings)

0

In [269]:
from toolz import concat

ids_data_objects_from_omics_processings = list(concat([
    d["has_output"] for d in db.omics_processing_set.find({
        "id": {
            "$in": ids_omics_processings
        }
    }, ["has_output"])
]))

In [270]:
len(ids_data_objects_from_omics_processings)

0

In [271]:
from tqdm.notebook import tqdm

ids_analyses = {}

for coll_name in tqdm(db.list_collection_names()):
    if coll_name.endswith("activity_set") or coll_name.endswith("assembly_set"):
        print(coll_name)
        db[coll_name].create_index("was_informed_by")
        ids_analyses[coll_name] = [
            d["id"] for d in
            db[coll_name].find({"was_informed_by": {"$in": ids_omics_processings}})
        ]

  0%|          | 0/31 [00:00<?, ?it/s]

metaproteomics_analysis_activity_set
read_QC_analysis_activity_set
mags_activity_set
metagenome_assembly_set
metatranscriptome_activity_set
nom_analysis_activity_set
metagenome_annotation_activity_set
read_based_analysis_activity_set
activity_set
metabolomics_analysis_activity_set


In [272]:
from toolz import concat

ids_data_objects_from_analyses = []

for coll_name, ids_analysis_set in ids_analyses.items():
    ids_data_objects_from_analyses.extend(list(concat([
        d["has_output"] for d in db[coll_name].find({
            "id": {
                "$in": ids_analysis_set
            }
        }, ["has_output"])
    ])))

In [273]:
len(ids_data_objects_from_analyses)

0

In [274]:
ids_data_objects = list(set(ids_data_objects_from_omics_processings) | set(ids_data_objects_from_analyses))

In [275]:
len(ids_data_objects), db.data_object_set.count_documents({"id": {"$in": ids_data_objects}})

(0, 0)

In [276]:
assert len(ids_data_objects) == db.data_object_set.count_documents({"id": {"$in": ids_data_objects}})

In [183]:
def db_object_for_study(db, study_id):
    pass

In [278]:
db_embargo = mongo.db.client["nmdc_embargo"]

In [284]:
studies = [dissoc(d, "_id") for d in db.study_set.find({"id": study_id})]
biosamples = [dissoc(d, "_id") for d in db.biosample_set.find({"id": {"$in": ids_biosamples}})]

In [285]:
rv = db_embargo.study_set.insert_many(studies)

In [287]:
len(rv.inserted_ids)

1

In [288]:
rv = db_embargo.biosample_set.insert_many(biosamples)

In [289]:
len(rv.inserted_ids)

246

In [290]:
rv = db.study_set.delete_one({"id": study_id})

In [291]:
rv.deleted_count

1

In [294]:
rv = db.biosample_set.delete_many({"id": {"$in": ids_biosamples}})

In [295]:
rv.deleted_count

246