From 32f55ab793c66fc3ee84b2c8c50203d4ba91ad2e Mon Sep 17 00:00:00 2001 From: Richard Challis Date: Wed, 7 Apr 2021 09:21:24 +0100 Subject: [PATCH] Write taxon IDs to imported/exceptions files Fixes #66 --- src/genomehubs/lib/hub.py | 128 ++++++++++++++++++++++++++---------- src/genomehubs/lib/index.py | 39 ++++++----- src/genomehubs/lib/taxon.py | 22 ++++--- 3 files changed, 132 insertions(+), 57 deletions(-) diff --git a/src/genomehubs/lib/hub.py b/src/genomehubs/lib/hub.py index bd1d5cc0..f0c3596f 100644 --- a/src/genomehubs/lib/hub.py +++ b/src/genomehubs/lib/hub.py @@ -1,9 +1,11 @@ #!/usr/bin/env python3 """Hub functions.""" +import csv import os import re import sys +from collections import defaultdict from copy import deepcopy from pathlib import Path @@ -422,6 +424,36 @@ def add_attribute_values(existing, new, *, raw=True): ) +def strip_comments(data, types): + """Strip comment lines from a file stream.""" + comment_chars = {"#"} + if "file" in types and "comment" in types["file"]: + comment_chars.update(set(types["file"]["comment"])) + for row in data: + if row[0] in comment_chars: + continue + yield row + + +def process_names_file(types, names_file): + """Process a taxon names file.""" + data = tofile.open_file_handle(names_file) + names = defaultdict(dict) + if data is None: + return names + delimiters = {"csv": ",", "tsv": "\t"} + rows = csv.reader( + strip_comments(data, types), + delimiter=delimiters[types["file"]["format"]], + quotechar='"', + ) + next(rows) + for row in rows: + name = row[3] if len(row) > 3 else row[1] + names[row[2]][row[1]] = {"name": name, "taxon_id": row[0]} + return names + + def validate_types_file(types_file, dir_path): """Validate types file.""" try: @@ -441,7 +473,8 @@ def validate_types_file(types_file, dir_path): defaults["metadata"].update({key: value}) types.update({"defaults": defaults}) data = tofile.open_file_handle(Path(dir_path) / types["file"]["name"]) - return types, data + names = process_names_file(types, Path(dir_path) / "names" / types["file"]["name"]) + return types, data, names def set_xrefs(taxon_names, types, row, *, meta=None): @@ -460,16 +493,8 @@ def set_xrefs(taxon_names, types, row, *, meta=None): return names -def process_row(types, row): - """Process a row of data.""" - data = { - "attributes": {}, - "identifiers": {}, - "metadata": {}, - "taxon_names": {}, - "taxonomy": {}, - "taxon_attributes": {}, - } +def set_row_defaults(types, data): + """Set default values for a row.""" for key in types["defaults"].keys(): if key in types: for entry in types[key].values(): @@ -479,6 +504,10 @@ def process_row(types, row): } elif key == "metadata": data["metadata"] = {**types["defaults"]["metadata"]} + + +def process_row_values(row, types, data): + """Process row values.""" for group in data.keys(): if group in types: for key, meta in types[group].items(): @@ -504,6 +533,20 @@ def process_row(types, row): except Exception as err: LOGGER.warning("Cannot parse row '%s'" % str(row)) raise err + + +def process_row(types, names, row): + """Process a row of data.""" + data = { + "attributes": {}, + "identifiers": {}, + "metadata": {}, + "taxon_names": {}, + "taxonomy": {}, + "taxon_attributes": {}, + } + set_row_defaults(types, data) + process_row_values(row, types, data) taxon_data = {} taxon_types = {} if "is_primary_value" in data["metadata"]: @@ -524,10 +567,18 @@ def process_row(types, row): ) else: data[attr_type] = [] - if "taxon_names" in data and data["taxon_names"]: + if data["taxon_names"]: data["taxon_names"] = set_xrefs( data["taxon_names"], types["taxon_names"], row, meta=data["metadata"] ) + if data["taxonomy"] and names: + for key in names.keys(): + if key in data["taxonomy"]: + if data["taxonomy"][key] in names[key]: + data["taxonomy"]["taxon_id"] = names[key][data["taxonomy"][key]][ + "taxon_id" + ] + data["taxonomy"][key] = names[key][data["taxonomy"][key]]["name"] return data, taxon_data, taxon_types.get("attributes", {}) @@ -571,23 +622,18 @@ def write_imported_rows(rows, opts, *, types, header=None, label="imported"): tofile.write_file(outfile, data) -def write_spellchecked_taxa(spellings, opts, *, types, header=None): +def write_spellchecked_taxa(spellings, opts, *, types): """Write spellchecked taxa to file.""" - imported = [] exceptions = [] file_key = "%s-exception" % opts["index"] dir_key = "%s-dir" % opts["index"] filepath = Path(types["file"]["name"]) extensions = "".join(filepath.suffixes) file_basename = str(filepath).replace(extensions, "") - for name, matches in spellings.items(): - # enable test condition below if importing spellchecked taxa: - # if len(matches) == 1: - # imported.append([name, matches[0]]) - # else: - exceptions.append([name] + matches) - if imported: - label = "imported" + for name, obj in spellings.items(): + exceptions.append([obj["taxon_id"], name, obj["rank"]] + obj["matches"]) + if exceptions: + label = "exceptions" if file_key in opts and opts[file_key]: outdir = opts[file_key] else: @@ -595,24 +641,40 @@ def write_spellchecked_taxa(spellings, opts, *, types, header=None): os.makedirs(outdir, exist_ok=True) outfile = "%s/%s" % (outdir, "%s.spellcheck.tsv" % file_basename) LOGGER.info( - "Writing %d spelling corrections to %s file '%s'", - len(imported), + "Writing %d spelling suggestions to %s file '%s'", + len(exceptions), label, outfile, ) - tofile.write_file(outfile, [["input", "corrected"]] + imported) - if exceptions: - label = "exceptions" + tofile.write_file( + outfile, [["taxon_id", "input", "rank", "suggested"]] + exceptions + ) + + +def write_imported_taxa(taxa, opts, *, types): + """Write imported taxa to file.""" + imported = [] + file_key = "%s-exception" % opts["index"] + dir_key = "%s-dir" % opts["index"] + filepath = Path(types["file"]["name"]) + extensions = "".join(filepath.suffixes) + file_basename = str(filepath).replace(extensions, "") + for name, arr in taxa.items(): + prefix = "#" if len(arr) > 1 else "" + for obj in arr: + imported.append( + ["%s%s" % (prefix, str(obj["taxon_id"])), name, obj["rank"]] + ) + if imported: if file_key in opts and opts[file_key]: outdir = opts[file_key] else: - outdir = "%s/%s" % (opts[dir_key], label) + outdir = "%s/imported" % opts[dir_key] os.makedirs(outdir, exist_ok=True) - outfile = "%s/%s" % (outdir, "%s.spellcheck.tsv" % file_basename) + outfile = "%s/%s" % (outdir, "%s.taxon_ids.tsv" % file_basename) LOGGER.info( - "Writing %d spelling suggestions to %s file '%s'", - len(exceptions), - label, + "Writing %d taxon_ids to imported file '%s'", + len(imported), outfile, ) - tofile.write_file(outfile, [["input", "suggested"]] + exceptions) + tofile.write_file(outfile, [["taxon_id", "input", "rank"]] + imported) diff --git a/src/genomehubs/lib/index.py b/src/genomehubs/lib/index.py index 32ccea37..75766801 100644 --- a/src/genomehubs/lib/index.py +++ b/src/genomehubs/lib/index.py @@ -72,8 +72,10 @@ from .files import index_metadata from .hub import process_row from .hub import set_column_indices +from .hub import strip_comments from .hub import validate_types_file from .hub import write_imported_rows +from .hub import write_imported_taxa from .hub import write_spellchecked_taxa from .taxon import add_names_and_attributes_to_taxa from .taxon import fix_missing_ids @@ -87,18 +89,19 @@ def not_blank(key, obj, blanks): return key in obj and obj[key] and obj[key] not in blanks -def strip_comments(data, types): - """Strip comment lines from a file stream.""" - comment_chars = {"#"} - if "file" in types and "comment" in types["file"]: - comment_chars.update(set(types["file"]["comment"])) - for row in data: - if row[0] in comment_chars: - continue - yield row +def summarise_imported_taxa(docs, imported_taxa): + """Summarise taxon imformation from a stram of taxon docs.""" + for entry_id, entry in docs: + imported_taxa[entry["scientific_name"]].append( + { + "taxon_id": entry["taxon_id"], + "rank": entry["taxon_rank"], + } + ) + yield entry_id, entry -def index_file(es, types, data, opts): +def index_file(es, types, names, data, opts): """Index a file.""" delimiters = {"csv": ",", "tsv": "\t"} rows = csv.reader( @@ -123,7 +126,9 @@ def index_file(es, types, data, opts): LOGGER.info("Processing rows") for row in tqdm(rows): try: - processed_data, taxon_data, new_taxon_types = process_row(types, row) + processed_data, taxon_data, new_taxon_types = process_row( + types, names, row + ) except Exception as err: print(err) failed_rows["None"].append(row) @@ -179,7 +184,7 @@ def index_file(es, types, data, opts): header=header, spellings=spellings, ) - write_spellchecked_taxa(spellings, opts, types=types, header=header) + write_spellchecked_taxa(spellings, opts, types=types) if with_ids or create_ids: write_imported_rows( imported_rows, opts, types=types, header=header, label="imported" @@ -189,12 +194,14 @@ def index_file(es, types, data, opts): docs = add_names_and_attributes_to_taxa( es, dict(with_ids), opts, template=taxon_template, blanks=blanks ) + imported_taxa = defaultdict(list) index_stream( es, taxon_template["index_name"], - docs, + summarise_imported_taxa(docs, imported_taxa), _op_type="update", ) + write_imported_taxa(imported_taxa, opts, types=types) elif opts["index"] == "assembly": # TODO: keep track of taxon_id not found exceptions assembly_template = assembly.index_template(taxonomy_name, opts) @@ -244,22 +251,24 @@ def main(args): if data_dir in options["index"]: dir_path = options["index"][data_dir] for types_file in sorted(Path(dir_path).glob("*.names.yaml")): - types, data = validate_types_file(types_file, dir_path) + types, data, names = validate_types_file(types_file, dir_path) LOGGER.info("Indexing %s" % types["file"]["name"]) index_types(es, index, types, options["index"]) index_file( es, types, + names, data, {**options["index"], "index": index, "index_types": index_types}, ) for types_file in sorted(Path(dir_path).glob("*.types.yaml")): - types, data = validate_types_file(types_file, dir_path) + types, data, names = validate_types_file(types_file, dir_path) LOGGER.info("Indexing %s" % types["file"]["name"]) index_types(es, index, types, options["index"]) index_file( es, types, + names, data, {**options["index"], "index": index, "index_types": index_types}, ) diff --git a/src/genomehubs/lib/taxon.py b/src/genomehubs/lib/taxon.py index 8cf8ae14..ba93a25f 100644 --- a/src/genomehubs/lib/taxon.py +++ b/src/genomehubs/lib/taxon.py @@ -451,10 +451,11 @@ def spellcheck_taxon(es, name, index, rank, taxonomy_index_template, opts, retur if option.get("collate_match", False) ] except KeyError: - return None + return None, rank, None except ValueError: - return None - if matches and len(matches) > 1: + return None, rank, None + taxon_id = None + if matches: taxon_matches = {} scientific_name = None for match in matches: @@ -466,14 +467,15 @@ def spellcheck_taxon(es, name, index, rank, taxonomy_index_template, opts, retur es, body, index, taxonomy_index_template, opts, return_type="taxon" ) if len(taxa) > 1: - return matches + return None, rank, matches for taxon in taxa: source = taxon["_source"] - taxon_matches[source["taxon_id"]] = source["scientific_name"] + taxon_id = source["taxon_id"] + taxon_matches[taxon_id] = source["scientific_name"] scientific_name = source["scientific_name"] if len(taxon_matches.keys()) == 1: - return [scientific_name] - return matches + return taxon_id, rank, [scientific_name] + return None, rank, matches def taxon_lookup(es, body, index, taxonomy_index_template, opts, return_type): @@ -523,11 +525,13 @@ def lookup_taxon( if name_class in {"any", "spellcheck"}: body.update({"id": "taxon_by_any_name"}) if name_class == "spellcheck": - matches = spellcheck_taxon( + taxon_id, rank, matches = spellcheck_taxon( es, name, index, rank, taxonomy_index_template, opts, return_type ) if matches: - spellings.update({name: matches}) + spellings.update( + {name: {"matches": matches, "taxon_id": taxon_id, "rank": rank}} + ) return [], name_class # Uncomment code blow to use suggestion in current import # if matches and len(matches) == 1: