Skip to content

Commit

Permalink
Write taxon IDs to imported/exceptions files
Browse files Browse the repository at this point in the history
Fixes #66
  • Loading branch information
rjchallis committed Apr 7, 2021
1 parent 64627bd commit 32f55ab
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 57 deletions.
128 changes: 95 additions & 33 deletions src/genomehubs/lib/hub.py
@@ -1,9 +1,11 @@
#!/usr/bin/env python3
"""Hub functions."""

import csv
import os
import re
import sys
from collections import defaultdict
from copy import deepcopy
from pathlib import Path

Expand Down Expand Up @@ -422,6 +424,36 @@ def add_attribute_values(existing, new, *, raw=True):
)


def strip_comments(data, types):
"""Strip comment lines from a file stream."""
comment_chars = {"#"}
if "file" in types and "comment" in types["file"]:
comment_chars.update(set(types["file"]["comment"]))
for row in data:
if row[0] in comment_chars:
continue
yield row


def process_names_file(types, names_file):
"""Process a taxon names file."""
data = tofile.open_file_handle(names_file)
names = defaultdict(dict)
if data is None:
return names
delimiters = {"csv": ",", "tsv": "\t"}
rows = csv.reader(
strip_comments(data, types),
delimiter=delimiters[types["file"]["format"]],
quotechar='"',
)
next(rows)
for row in rows:
name = row[3] if len(row) > 3 else row[1]
names[row[2]][row[1]] = {"name": name, "taxon_id": row[0]}
return names


def validate_types_file(types_file, dir_path):
"""Validate types file."""
try:
Expand All @@ -441,7 +473,8 @@ def validate_types_file(types_file, dir_path):
defaults["metadata"].update({key: value})
types.update({"defaults": defaults})
data = tofile.open_file_handle(Path(dir_path) / types["file"]["name"])
return types, data
names = process_names_file(types, Path(dir_path) / "names" / types["file"]["name"])
return types, data, names


def set_xrefs(taxon_names, types, row, *, meta=None):
Expand All @@ -460,16 +493,8 @@ def set_xrefs(taxon_names, types, row, *, meta=None):
return names


def process_row(types, row):
"""Process a row of data."""
data = {
"attributes": {},
"identifiers": {},
"metadata": {},
"taxon_names": {},
"taxonomy": {},
"taxon_attributes": {},
}
def set_row_defaults(types, data):
"""Set default values for a row."""
for key in types["defaults"].keys():
if key in types:
for entry in types[key].values():
Expand All @@ -479,6 +504,10 @@ def process_row(types, row):
}
elif key == "metadata":
data["metadata"] = {**types["defaults"]["metadata"]}


def process_row_values(row, types, data):
"""Process row values."""
for group in data.keys():
if group in types:
for key, meta in types[group].items():
Expand All @@ -504,6 +533,20 @@ def process_row(types, row):
except Exception as err:
LOGGER.warning("Cannot parse row '%s'" % str(row))
raise err


def process_row(types, names, row):
"""Process a row of data."""
data = {
"attributes": {},
"identifiers": {},
"metadata": {},
"taxon_names": {},
"taxonomy": {},
"taxon_attributes": {},
}
set_row_defaults(types, data)
process_row_values(row, types, data)
taxon_data = {}
taxon_types = {}
if "is_primary_value" in data["metadata"]:
Expand All @@ -524,10 +567,18 @@ def process_row(types, row):
)
else:
data[attr_type] = []
if "taxon_names" in data and data["taxon_names"]:
if data["taxon_names"]:
data["taxon_names"] = set_xrefs(
data["taxon_names"], types["taxon_names"], row, meta=data["metadata"]
)
if data["taxonomy"] and names:
for key in names.keys():
if key in data["taxonomy"]:
if data["taxonomy"][key] in names[key]:
data["taxonomy"]["taxon_id"] = names[key][data["taxonomy"][key]][
"taxon_id"
]
data["taxonomy"][key] = names[key][data["taxonomy"][key]]["name"]
return data, taxon_data, taxon_types.get("attributes", {})


Expand Down Expand Up @@ -571,48 +622,59 @@ def write_imported_rows(rows, opts, *, types, header=None, label="imported"):
tofile.write_file(outfile, data)


def write_spellchecked_taxa(spellings, opts, *, types, header=None):
def write_spellchecked_taxa(spellings, opts, *, types):
"""Write spellchecked taxa to file."""
imported = []
exceptions = []
file_key = "%s-exception" % opts["index"]
dir_key = "%s-dir" % opts["index"]
filepath = Path(types["file"]["name"])
extensions = "".join(filepath.suffixes)
file_basename = str(filepath).replace(extensions, "")
for name, matches in spellings.items():
# enable test condition below if importing spellchecked taxa:
# if len(matches) == 1:
# imported.append([name, matches[0]])
# else:
exceptions.append([name] + matches)
if imported:
label = "imported"
for name, obj in spellings.items():
exceptions.append([obj["taxon_id"], name, obj["rank"]] + obj["matches"])
if exceptions:
label = "exceptions"
if file_key in opts and opts[file_key]:
outdir = opts[file_key]
else:
outdir = "%s/%s" % (opts[dir_key], label)
os.makedirs(outdir, exist_ok=True)
outfile = "%s/%s" % (outdir, "%s.spellcheck.tsv" % file_basename)
LOGGER.info(
"Writing %d spelling corrections to %s file '%s'",
len(imported),
"Writing %d spelling suggestions to %s file '%s'",
len(exceptions),
label,
outfile,
)
tofile.write_file(outfile, [["input", "corrected"]] + imported)
if exceptions:
label = "exceptions"
tofile.write_file(
outfile, [["taxon_id", "input", "rank", "suggested"]] + exceptions
)


def write_imported_taxa(taxa, opts, *, types):
"""Write imported taxa to file."""
imported = []
file_key = "%s-exception" % opts["index"]
dir_key = "%s-dir" % opts["index"]
filepath = Path(types["file"]["name"])
extensions = "".join(filepath.suffixes)
file_basename = str(filepath).replace(extensions, "")
for name, arr in taxa.items():
prefix = "#" if len(arr) > 1 else ""
for obj in arr:
imported.append(
["%s%s" % (prefix, str(obj["taxon_id"])), name, obj["rank"]]
)
if imported:
if file_key in opts and opts[file_key]:
outdir = opts[file_key]
else:
outdir = "%s/%s" % (opts[dir_key], label)
outdir = "%s/imported" % opts[dir_key]
os.makedirs(outdir, exist_ok=True)
outfile = "%s/%s" % (outdir, "%s.spellcheck.tsv" % file_basename)
outfile = "%s/%s" % (outdir, "%s.taxon_ids.tsv" % file_basename)
LOGGER.info(
"Writing %d spelling suggestions to %s file '%s'",
len(exceptions),
label,
"Writing %d taxon_ids to imported file '%s'",
len(imported),
outfile,
)
tofile.write_file(outfile, [["input", "suggested"]] + exceptions)
tofile.write_file(outfile, [["taxon_id", "input", "rank"]] + imported)
39 changes: 24 additions & 15 deletions src/genomehubs/lib/index.py
Expand Up @@ -72,8 +72,10 @@
from .files import index_metadata
from .hub import process_row
from .hub import set_column_indices
from .hub import strip_comments
from .hub import validate_types_file
from .hub import write_imported_rows
from .hub import write_imported_taxa
from .hub import write_spellchecked_taxa
from .taxon import add_names_and_attributes_to_taxa
from .taxon import fix_missing_ids
Expand All @@ -87,18 +89,19 @@ def not_blank(key, obj, blanks):
return key in obj and obj[key] and obj[key] not in blanks


def strip_comments(data, types):
"""Strip comment lines from a file stream."""
comment_chars = {"#"}
if "file" in types and "comment" in types["file"]:
comment_chars.update(set(types["file"]["comment"]))
for row in data:
if row[0] in comment_chars:
continue
yield row
def summarise_imported_taxa(docs, imported_taxa):
"""Summarise taxon imformation from a stram of taxon docs."""
for entry_id, entry in docs:
imported_taxa[entry["scientific_name"]].append(
{
"taxon_id": entry["taxon_id"],
"rank": entry["taxon_rank"],
}
)
yield entry_id, entry


def index_file(es, types, data, opts):
def index_file(es, types, names, data, opts):
"""Index a file."""
delimiters = {"csv": ",", "tsv": "\t"}
rows = csv.reader(
Expand All @@ -123,7 +126,9 @@ def index_file(es, types, data, opts):
LOGGER.info("Processing rows")
for row in tqdm(rows):
try:
processed_data, taxon_data, new_taxon_types = process_row(types, row)
processed_data, taxon_data, new_taxon_types = process_row(
types, names, row
)
except Exception as err:
print(err)
failed_rows["None"].append(row)
Expand Down Expand Up @@ -179,7 +184,7 @@ def index_file(es, types, data, opts):
header=header,
spellings=spellings,
)
write_spellchecked_taxa(spellings, opts, types=types, header=header)
write_spellchecked_taxa(spellings, opts, types=types)
if with_ids or create_ids:
write_imported_rows(
imported_rows, opts, types=types, header=header, label="imported"
Expand All @@ -189,12 +194,14 @@ def index_file(es, types, data, opts):
docs = add_names_and_attributes_to_taxa(
es, dict(with_ids), opts, template=taxon_template, blanks=blanks
)
imported_taxa = defaultdict(list)
index_stream(
es,
taxon_template["index_name"],
docs,
summarise_imported_taxa(docs, imported_taxa),
_op_type="update",
)
write_imported_taxa(imported_taxa, opts, types=types)
elif opts["index"] == "assembly":
# TODO: keep track of taxon_id not found exceptions
assembly_template = assembly.index_template(taxonomy_name, opts)
Expand Down Expand Up @@ -244,22 +251,24 @@ def main(args):
if data_dir in options["index"]:
dir_path = options["index"][data_dir]
for types_file in sorted(Path(dir_path).glob("*.names.yaml")):
types, data = validate_types_file(types_file, dir_path)
types, data, names = validate_types_file(types_file, dir_path)
LOGGER.info("Indexing %s" % types["file"]["name"])
index_types(es, index, types, options["index"])
index_file(
es,
types,
names,
data,
{**options["index"], "index": index, "index_types": index_types},
)
for types_file in sorted(Path(dir_path).glob("*.types.yaml")):
types, data = validate_types_file(types_file, dir_path)
types, data, names = validate_types_file(types_file, dir_path)
LOGGER.info("Indexing %s" % types["file"]["name"])
index_types(es, index, types, options["index"])
index_file(
es,
types,
names,
data,
{**options["index"], "index": index, "index_types": index_types},
)
Expand Down
22 changes: 13 additions & 9 deletions src/genomehubs/lib/taxon.py
Expand Up @@ -451,10 +451,11 @@ def spellcheck_taxon(es, name, index, rank, taxonomy_index_template, opts, retur
if option.get("collate_match", False)
]
except KeyError:
return None
return None, rank, None
except ValueError:
return None
if matches and len(matches) > 1:
return None, rank, None
taxon_id = None
if matches:
taxon_matches = {}
scientific_name = None
for match in matches:
Expand All @@ -466,14 +467,15 @@ def spellcheck_taxon(es, name, index, rank, taxonomy_index_template, opts, retur
es, body, index, taxonomy_index_template, opts, return_type="taxon"
)
if len(taxa) > 1:
return matches
return None, rank, matches
for taxon in taxa:
source = taxon["_source"]
taxon_matches[source["taxon_id"]] = source["scientific_name"]
taxon_id = source["taxon_id"]
taxon_matches[taxon_id] = source["scientific_name"]
scientific_name = source["scientific_name"]
if len(taxon_matches.keys()) == 1:
return [scientific_name]
return matches
return taxon_id, rank, [scientific_name]
return None, rank, matches


def taxon_lookup(es, body, index, taxonomy_index_template, opts, return_type):
Expand Down Expand Up @@ -523,11 +525,13 @@ def lookup_taxon(
if name_class in {"any", "spellcheck"}:
body.update({"id": "taxon_by_any_name"})
if name_class == "spellcheck":
matches = spellcheck_taxon(
taxon_id, rank, matches = spellcheck_taxon(
es, name, index, rank, taxonomy_index_template, opts, return_type
)
if matches:
spellings.update({name: matches})
spellings.update(
{name: {"matches": matches, "taxon_id": taxon_id, "rank": rank}}
)
return [], name_class
# Uncomment code blow to use suggestion in current import
# if matches and len(matches) == 1:
Expand Down

0 comments on commit 32f55ab

Please sign in to comment.