From cf23ce34e7943ee23d00e46f6dd2ae17551c58a4 Mon Sep 17 00:00:00 2001 From: Evan Rees <25933122+WiscEvan@users.noreply.github.com> Date: Wed, 12 Jan 2022 14:00:18 -0500 Subject: [PATCH] Refactor autometa-taxonomy-lca (#211) * start to fixing issue-#170 :art: lca entrypoint now has updated output parameter :green_apple::art: update lca.nf with updated entrypoint param * :art::green-apple::snake: WIP * :art::bug: Update entrypoint parameters fixes #170 stores pickled data structures for LCA/RMQ to specified directory :bug: Update entrypoint parameters in autometa.sh workflow for autometa-taxonomy-lca :art::green_apple: Include meta.id in LCA outputs * :bug: Replace LCA(...) instantiation outdir param to cache * :bug: Replace incorrect variable to prevent passing pd.DataFrame to load(...) func in markers.get(...) * :art: change process_low to process_medium in prepare_lca.nf --- autometa/common/markers.py | 2 +- autometa/taxonomy/lca.py | 334 +++++++++++++++++------ autometa/taxonomy/majority_vote.py | 5 +- autometa/taxonomy/ncbi.py | 122 +++++++++ autometa/taxonomy/vote.py | 4 +- modules/local/{lca.nf => prepare_lca.nf} | 22 +- modules/local/reduce_lca.nf | 45 +++ subworkflows/local/bin_contigs.nf | 105 ------- subworkflows/local/lca.nf | 58 ++++ subworkflows/local/taxon_assignment.nf | 5 +- workflows/autometa.sh | 9 +- 11 files changed, 507 insertions(+), 204 deletions(-) rename modules/local/{lca.nf => prepare_lca.nf} (69%) create mode 100644 modules/local/reduce_lca.nf delete mode 100644 subworkflows/local/bin_contigs.nf create mode 100644 subworkflows/local/lca.nf diff --git a/autometa/common/markers.py b/autometa/common/markers.py index 15d4179d7..cad1f2612 100644 --- a/autometa/common/markers.py +++ b/autometa/common/markers.py @@ -193,7 +193,7 @@ def get( ) if not os.path.exists(out) or not os.path.getsize(out): - out = hmmscan.filter_tblout_markers( + df = hmmscan.filter_tblout_markers( infpath=scans, outfpath=out, cutoffs=cutoffs, diff --git a/autometa/taxonomy/lca.py b/autometa/taxonomy/lca.py index 943a539e8..9d34f5f16 100644 --- a/autometa/taxonomy/lca.py +++ b/autometa/taxonomy/lca.py @@ -29,11 +29,10 @@ import functools -import gzip import logging import os -from typing import Dict +from typing import Dict, Set, Tuple from itertools import chain import pandas as pd @@ -60,11 +59,9 @@ class LCA(NCBI): Parameters ---------- dbdir : str - + Path to directory containing files: nodes.dmp, names.dmp, merged.dmp, prot.accession2taxid.gz outdir : str - - usepickle : bool, optional - Whether to serialize intermediate files to disk for later lookup (the default is True). + Output directory path to to serialize intermediate files to disk for later lookup verbose : bool, optional Add verbosity to logging stream (the default is False). @@ -94,18 +91,22 @@ class LCA(NCBI): """ - def __init__(self, dbdir: str, verbose: bool = False): + def __init__(self, dbdir: str, verbose: bool = False, cache: str = None): super().__init__(dbdir, verbose=verbose) self.verbose = verbose self.dbdir = dbdir + self.cache = cache self.disable = False if verbose else True - self.tour_fp = os.path.join(self.dbdir, "tour.pkl.gz") + if self.cache and not os.path.exists(self.cache): + logger.info(f"Created LCA cache dir: {self.cache}") + os.makedirs(self.cache) + self.tour_fp = os.path.join(self.cache, "tour.pkl.gz") self.tour = None - self.level_fp = os.path.join(self.dbdir, "level.pkl.gz") + self.level_fp = os.path.join(self.cache, "level.pkl.gz") self.level = None - self.occurrence_fp = os.path.join(self.dbdir, "occurrence.pkl.gz") + self.occurrence_fp = os.path.join(self.cache, "occurrence.pkl.gz") self.occurrence = None - self.sparse_fp = os.path.join(self.dbdir, "precomputed_lcas.pkl.gz") + self.sparse_fp = os.path.join(self.cache, "precomputed_lcas.pkl.gz") self.sparse = None self.lca_prepared = False @@ -206,9 +207,10 @@ def prepare_tree(self): self.tour = tour self.level = level self.occurrence = occurrence - make_pickle(obj=self.tour, outfpath=self.tour_fp) - make_pickle(obj=self.level, outfpath=self.level_fp) - make_pickle(obj=self.occurrence, outfpath=self.occurrence_fp) + if self.cache: + make_pickle(obj=self.tour, outfpath=self.tour_fp) + make_pickle(obj=self.level, outfpath=self.level_fp) + make_pickle(obj=self.occurrence, outfpath=self.occurrence_fp) return def preprocess_minimums(self): @@ -219,9 +221,10 @@ def preprocess_minimums(self): For more information on these data structures see :func:`~lca.LCA.prepare_tree`. Sparse table size: - n = number of elements in level list - rows range = (0 to n) - columns range = (0 to logn) + + * n = number of elements in level list + * rows range = (0 to n) + * columns range = (0 to logn) Returns ------- @@ -269,7 +272,8 @@ def preprocess_minimums(self): sparse_array[row, col] = lower_index else: sparse_array[row, col] = upper_index - make_pickle(obj=sparse_array, outfpath=self.sparse_fp) + if self.cache: + make_pickle(obj=sparse_array, outfpath=self.sparse_fp) self.sparse = sparse_array return @@ -287,6 +291,7 @@ def prepare_lca(self): self.prepare_tree() self.preprocess_minimums() self.lca_prepared = True + logger.debug(f"LCA data structures prepared") # tour, level, occurrence, sparse all ready return @@ -297,6 +302,7 @@ def lca(self, node1, node2): ---------- node1 : int taxid + node2 : int taxid @@ -346,15 +352,18 @@ def lca(self, node1, node2): # (parent, child) return lca_node[1] - def convert_sseqids_to_taxids(self, sseqids: Dict[str, str]) -> Dict[str, int]: + def convert_sseqids_to_taxids( + self, + sseqids: Dict[str, Set[str]], + ) -> Tuple[Dict[str, Set[int]], pd.DataFrame]: """ - Translates subject sequence ids to taxids from prot.accession2taxid.gz. + Translates subject sequence ids to taxids from prot.accession2taxid.gz and dead_prot.accession2taxid.gz. - Note - ---- - If an accession number is no longer available in prot.accesssion2taxid.gz - (either due to being suppressed, deprecated or removed by NCBI), - then root taxid (1) is returned as the taxid for the corresponding sseqid. + .. note:: + + If an accession number is no longer available in prot.accesssion2taxid.gz + (either due to being suppressed, deprecated or removed by NCBI), + then root taxid (1) is returned as the taxid for the corresponding sseqid. Parameters ---------- @@ -363,8 +372,8 @@ def convert_sseqids_to_taxids(self, sseqids: Dict[str, str]) -> Dict[str, int]: Returns ------- - dict - {qseqid: {taxid, taxid, ...}, ...} + Tuple[Dict[str, Set[int]], pd.DataFrame] + {qseqid: {taxid, taxid, ...}, ...}, index=range, cols=[qseqid, sseqid, raw_taxid, merged_taxid, cleaned_taxid] Raises ------- @@ -372,50 +381,76 @@ def convert_sseqids_to_taxids(self, sseqids: Dict[str, str]) -> Dict[str, int]: prot.accession2taxid.gz database is required for sseqid to taxid conversion. """ - # We first retrieve all possible sseqids that we will need to convert. - accessions = set( + # We first get all unique sseqids that were retrieved from the blast output + recovered_sseqids = set( chain.from_iterable([qseqid_sseqids for qseqid_sseqids in sseqids.values()]) ) - # "rt" open the database in text mode instead of binary to be handled like a text file - fh = ( - gzip.open(self.accession2taxid_fpath, "rt") - if self.accession2taxid_fpath.endswith(".gz") - else open(self.accession2taxid_fpath) - ) - __ = fh.readline() # remove the first line as it just gives the description - if self.verbose: - logger.debug( - f"Searching for {len(accessions):,} accessions in {os.path.basename(self.accession2taxid_fpath)}. This may take a while..." + + # Check for sseqid in dead_prot.accession2taxid.gz in case an old db was used. + # Any accessions not found in live prot.accession2taxid db *may* be here. + # This *attempts* to prevent sseqids from being assigned root (root taxid=1) + try: + sseqids_to_taxids = self.search_prot_accessions( + accessions=recovered_sseqids, + db="dead", + sseqids_to_taxids=None, ) - n_lines = ( - file_length(self.accession2taxid_fpath, approximate=True) - if self.verbose - else None + dead_sseqids_found = set(sseqids_to_taxids.keys()) + except FileNotFoundError as db_fpath: + logger.warn(f"Skipping taxid conversion from {db_fpath}") + sseqids_to_taxids = None + dead_sseqids_found = set() + + # Now build the mapping from sseqid to taxid from the full/live accession2taxid dbs + # Possibly overwriting any merged accessions to live accessions + sseqids_to_taxids = self.search_prot_accessions( + accessions=recovered_sseqids, + db="full", + sseqids_to_taxids=sseqids_to_taxids, ) - desc = f"Parsing {os.path.basename(self.accession2taxid_fpath)}" - sseqids_to_taxids = {} - for line in tqdm( - fh, disable=self.disable, desc=desc, total=n_lines, leave=False - ): - acc_num, acc_ver, taxid, _ = line.split("\t") - taxid = int(taxid) - if acc_num in accessions: - sseqids_to_taxids.update({acc_num: taxid}) - if acc_ver in accessions: - sseqids_to_taxids.update({acc_ver: taxid}) - fh.close() - # Now translate qseqid sseqids to taxids + + # Remove sseqids: Ignore any sseqids already found + live_sseqids_found = set(sseqids_to_taxids.keys()) + live_sseqids_found -= dead_sseqids_found + recovered_sseqids -= live_sseqids_found + if recovered_sseqids: + logger.warn( + f"sseqids without corresponding taxid: {len(recovered_sseqids):,}" + ) root_taxid = 1 - # Will place taxid as root (i.e. 1) if sseqid is not found in prot.accession2taxid taxids = {} + sseqid_not_found = pd.NA + sseqid_to_taxid_df = pd.DataFrame( + [ + { + "qseqid": qseqid, + "sseqid": sseqid, + "raw_taxid": sseqids_to_taxids.get(sseqid, sseqid_not_found), + } + for qseqid, qseqid_sseqids in sseqids.items() + for sseqid in qseqid_sseqids + ] + ) + # Update taxids if they are old. + sseqid_to_taxid_df["merged_taxid"] = sseqid_to_taxid_df.raw_taxid.map( + lambda tid: self.merged.get(tid, tid) + ) + # If we still have missing taxids, we will set the sseqid value to the root taxid + # fill missing taxids with root_taxid + sseqid_to_taxid_df["cleaned_taxid"] = sseqid_to_taxid_df.merged_taxid.fillna( + root_taxid + ) for qseqid, qseqid_sseqids in sseqids.items(): - qseqid_taxids = [ + # NOTE: we only want to retrieve the set of unique taxids (not a list) for LCA query + qseqid_taxids = { sseqids_to_taxids.get(sseqid, root_taxid) for sseqid in qseqid_sseqids - ] - taxids.update({qseqid: qseqid_taxids}) - return taxids + } + taxids[qseqid] = qseqid_taxids + return taxids, sseqid_to_taxid_df - def reduce_taxids_to_lcas(self, taxids: Dict[str, int]) -> Dict[str, int]: + def reduce_taxids_to_lcas( + self, taxids: Dict[str, Set[int]] + ) -> Tuple[Dict[str, int], pd.DataFrame]: """Retrieves the lowest common ancestor for each set of taxids in of the taxids Parameters @@ -425,36 +460,86 @@ def reduce_taxids_to_lcas(self, taxids: Dict[str, int]) -> Dict[str, int]: Returns ------- - dict - {qseqid: lca, qseqid: lca, ...} + Tuple[Dict[str, int], pd.DataFrame] + {qseqid: lca, qseqid: lca, ...}, pd.DataFrame(index=range, cols=[qseqid, taxids]) """ lca_taxids = {} + missing_taxids = [] root_taxid = 1 + logger.debug(f"Assigning LCAs to {len(taxids):,} ORFs") for qseqid, qseqid_taxids in taxids.items(): # This will convert any deprecated taxids to their current taxids before reduction to LCA. - qseqid_taxids = [self.merged.get(taxid, taxid) for taxid in qseqid_taxids] + # NOTE: we only want to retrieve the set of unique taxids (not a list) for LCA query + qseqid_taxids = {self.merged.get(taxid, taxid) for taxid in qseqid_taxids} lca_taxid = False num_taxids = len(qseqid_taxids) while not lca_taxid: + # Reduce all qseqid taxids to LCA if num_taxids >= 2: try: lca_taxid = functools.reduce( lambda taxid1, taxid2: self.lca(node1=taxid1, node2=taxid2), qseqid_taxids, ) - except ValueError: - logger.error(f"Missing either taxid(s), during LCA retrieval") + except ValueError as err: + logger.warn( + f"Missing taxids during LCA retrieval. Setting {qseqid} ({len(qseqid_taxids):,} ORF taxids) to root taxid." + ) lca_taxid = root_taxid - if num_taxids == root_taxid: + missing_taxids.append( + { + "qseqid": qseqid, + "taxids": ",".join( + [str(taxid) for taxid in qseqid_taxids] + ), + } + ) + # If only one taxid was recovered... This is our LCA + elif num_taxids == 1: lca_taxid = qseqid_taxids.pop() - # Exception handling where input for qseqid contains no taxids - if num_taxids == 0: + else: + # Exception handling where input for qseqid contains no taxids e.g. num_taxids == 0: lca_taxid = root_taxid lca_taxids.update({qseqid: lca_taxid}) - return lca_taxids + missing_df = pd.DataFrame(missing_taxids) + return lca_taxids, missing_df + + def read_sseqid_to_taxid_table( + self, sseqid_to_taxid_filepath: str + ) -> Dict[str, Set[int]]: + """Retrieve each qseqid's set of taxids from `sseqid_to_taxid_filepath` for reduction by LCA + + Parameters + ---------- + sseqid_to_taxid_filepath : str + Path to sseqid to taxid table with columns: qseqid, sseqid, raw_taxid, merged_taxid, clean_taxid + + Returns + ------- + Dict[str, Set[int]] + Dictionary keyed by qseqid containing sets of respective `clean` taxid + """ + taxids = {} + with open(sseqid_to_taxid_filepath) as fh: + __ = fh.readline() + for line in fh: + qseqid, *__, clean_taxid = line.strip().split("\t") + clean_taxid = int(clean_taxid) + if qseqid in taxids: + taxids[qseqid].add(clean_taxid) + else: + taxids[qseqid] = set([clean_taxid]) + return taxids - def blast2lca(self, blast: str, out: str, force: bool = False) -> str: + def blast2lca( + self, + blast: str, + out: str, + sseqid_to_taxid_output: str = "", + lca_reduction_log: str = "", + force: bool = False, + ) -> str: """Determine lowest common ancestor of provided amino-acid ORFs. Parameters @@ -463,6 +548,8 @@ def blast2lca(self, blast: str, out: str, force: bool = False) -> str: . out : str . + sseqid_to_taxid_output : str + Path to write qseqids' sseqids and their taxid designations from NCBI databases force : bool, optional Force overwrite of existing `out`. @@ -475,12 +562,33 @@ def blast2lca(self, blast: str, out: str, force: bool = False) -> str: if os.path.exists(out) and os.path.getsize(out) and not force: logger.warning(f"FileAlreadyExists {out}") return out - if not os.path.exists(blast) or not os.path.getsize(blast): - raise FileNotFoundError(blast) blast = os.path.realpath(blast) - sseqids = diamond.parse(results=blast, verbose=self.verbose) - taxids = self.convert_sseqids_to_taxids(sseqids) - lcas = self.reduce_taxids_to_lcas(taxids) + # If sseqid_to_taxid_output exists then we'll retrieve sseqid taxids from this... + if ( + sseqid_to_taxid_output + and os.path.exists(sseqid_to_taxid_output) + and os.path.getsize(sseqid_to_taxid_output) + ): + logger.debug(f"Retrieving taxids from {sseqid_to_taxid_output}") + taxids = self.read_sseqid_to_taxid_table(sseqid_to_taxid_output) + elif not os.path.exists(blast) or not os.path.getsize(blast): + raise FileNotFoundError(blast) + elif os.path.exists(blast) and os.path.getsize(blast): + logger.debug(f"Retrieving taxids from {blast}") + sseqids = diamond.parse(results=blast, verbose=self.verbose) + taxids, sseqid_to_taxid_df = self.convert_sseqids_to_taxids(sseqids) + if sseqid_to_taxid_output: + sseqid_to_taxid_df.to_csv( + sseqid_to_taxid_output, sep="\t", index=False, header=True + ) + logger.info( + f"Wrote {sseqid_to_taxid_df.shape[0]:,} qseqids to {sseqid_to_taxid_output}" + ) + # TODO: merge sseqid_to_taxid_output table with err_qseqids_df + # Alluvial diagrams may be interesting here... + lcas, err_qseqids_df = self.reduce_taxids_to_lcas(taxids) + if lca_reduction_log: + err_qseqids_df.to_csv(lca_reduction_log, sep="\t", index=False, header=True) written_lcas = self.write_lcas(lcas=lcas, out=out) return written_lcas @@ -520,6 +628,7 @@ def write_lcas(self, lcas: Dict[str, int], out: str) -> str: nlines += 1 fh.write(lines) fh.close() + logger.info(f"Assigned LCA to {len(lcas):,} ORFs: {out}") return out def parse( @@ -546,10 +655,13 @@ def parse( ------- FileNotFoundError `lca_fpath` does not exist. + FileNotFoundError `orfs_fpath` does not exist. + ValueError If prodigal version is under 2.6, `orfs_fpath` is a required input. + """ logger.debug(f"Parsing LCA table: {lca_fpath}") if not os.path.exists(lca_fpath): @@ -562,7 +674,7 @@ def parse( version = float(".".join(version.split(".")[:2])) else: version = float(version) - if version < 2.6: + if version < 2.6 and not orfs_fpath: raise ValueError("Prodigal version under 2.6 requires orfs_fpath input!") # Create a contig translation dictionary with or without ORFs if orfs_fpath: @@ -605,20 +717,48 @@ def parse( def main(): import argparse + import logging as logger + + logger.basicConfig( + format="[%(asctime)s %(levelname)s] %(name)s: %(message)s", + datefmt="%m/%d/%Y %I:%M:%S %p", + level=logger.DEBUG, + ) parser = argparse.ArgumentParser( description="Script to determine Lowest Common Ancestor", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument("--output", help="Path to write LCA results.", required=True) parser.add_argument( "--blast", help="Path to BLAST results table respective to `orfs`. " "(Note: The table provided must be in outfmt=6)", + metavar="filepath", + required=True, + ) + parser.add_argument( + "--dbdir", + help="Path to NCBI databases directory.", + metavar="dirpath", + default=NCBI_DIR, + ) + parser.add_argument( + "--lca-output", + help="Path to write LCA results.", + metavar="filepath", required=True, ) parser.add_argument( - "--dbdir", help="Path to NCBI databases directory.", default=NCBI_DIR + "--sseqid2taxid-output", + help="Path to write qseqids sseqids to taxids translations table", + required=False, + metavar="filepath", + ) + parser.add_argument( + "--lca-error-taxids", + help="Path to write table of blast table qseqids that were assigned root due to a missing taxid", + required=False, + metavar="filepath", ) parser.add_argument( "--verbose", @@ -632,13 +772,43 @@ def main(): action="store_true", default=False, ) + parser.add_argument( + "--cache", help="Path to cache pickled LCA database objects.", metavar="dirpath" + ) + parser.add_argument( + "--only-prepare-cache", + help="Only prepare the LCA database objects and write to provided --cache parameter", + action="store_true", + default=False, + ) + parser.add_argument( + "--force-cache-overwrite", + help="Force overwrite if results already exist.", + action="store_true", + default=False, + ) args = parser.parse_args() - lca = LCA(dbdir=args.dbdir, verbose=args.verbose) + lca = LCA(dbdir=args.dbdir, verbose=args.verbose, cache=args.cache) + + # The pkl objects will be recomputed if the respective process cache file does not exist + if args.force_cache_overwrite: + logger.info("Overwriting cache") + if os.path.exists(lca.tour_fp): + os.remove(lca.tour_fp) + if os.path.exists(lca.sparse_fp): + os.remove(lca.sparse_fp) + + if args.only_prepare_cache: + lca.prepare_lca() + logger.info("Cache prepared") + return lca.blast2lca( blast=args.blast, - out=args.output, + out=args.lca_output, + sseqid_to_taxid_output=args.sseqid2taxid_output, + lca_reduction_log=args.lca_error_taxids, force=args.force, ) diff --git a/autometa/taxonomy/majority_vote.py b/autometa/taxonomy/majority_vote.py index a50ae0eaa..01fce499b 100644 --- a/autometa/taxonomy/majority_vote.py +++ b/autometa/taxonomy/majority_vote.py @@ -267,7 +267,6 @@ def majority_vote( ncbi_dir: str, verbose: bool = False, orfs: str = None, - force: bool = False, ) -> str: """Wrapper for modified majority voting algorithm from Autometa 1.0 @@ -292,7 +291,8 @@ def majority_vote( Path to assigned taxids table. """ - lca = LCA(dbdir=ncbi_dir, verbose=verbose) + outdir = os.path.dirname(os.path.realpath(out)) + lca = LCA(dbdir=ncbi_dir, verbose=verbose, cache=outdir) # retrieve lca taxids for each contig classifications = lca.parse(lca_fpath=lca_fpath, orfs_fpath=orfs) # Vote for majority lca taxid from contig lca taxids @@ -304,7 +304,6 @@ def majority_vote( def main(): import argparse - basedir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) parser = argparse.ArgumentParser( description="Script to assign taxonomy via a modified majority voting" " algorithm.", diff --git a/autometa/taxonomy/ncbi.py b/autometa/taxonomy/ncbi.py index 0cb5f2d6e..c24c05718 100644 --- a/autometa/taxonomy/ncbi.py +++ b/autometa/taxonomy/ncbi.py @@ -24,6 +24,7 @@ """ +import gzip import logging import os import string @@ -34,6 +35,7 @@ from tqdm import tqdm +from autometa.common.utilities import file_length from autometa.common.exceptions import DatabaseOutOfSyncError from autometa.config.utilities import DEFAULT_CONFIG @@ -65,10 +67,13 @@ def __init__(self, dirpath, verbose=False): Parameters ---------- + dirpath : str Path to the database directory + verbose : bool, optional log progress to terminal, by default False + """ self.dirpath = dirpath self.verbose = verbose @@ -76,12 +81,32 @@ def __init__(self, dirpath, verbose=False): self.names_fpath = os.path.join(self.dirpath, "names.dmp") self.nodes_fpath = os.path.join(self.dirpath, "nodes.dmp") self.merged_fpath = os.path.join(self.dirpath, "merged.dmp") + # Set prot.accession2taxid filepath self.accession2taxid_fpath = os.path.join(self.dirpath, "prot.accession2taxid") acc2taxid_gz = ".".join([self.accession2taxid_fpath, "gz"]) if not os.path.exists(self.accession2taxid_fpath) and os.path.exists( acc2taxid_gz ): self.accession2taxid_fpath = acc2taxid_gz + # Set prot.accession2taxid.FULL.gz filepath + self.accession2taxidfull_fpath = os.path.join( + self.dirpath, "prot.accession2taxid.FULL" + ) + acc2taxid_gz = ".".join([self.accession2taxidfull_fpath, "gz"]) + if not os.path.exists(self.accession2taxidfull_fpath) and os.path.exists( + acc2taxid_gz + ): + self.accession2taxidfull_fpath = acc2taxid_gz + # Set dead_prot.accession2taxid filepath + self.dead_accession2taxid_fpath = os.path.join( + self.dirpath, "dead_prot.accession2taxid" + ) + acc2taxid_gz = ".".join([self.dead_accession2taxid_fpath, "gz"]) + if not os.path.exists(self.dead_accession2taxid_fpath) and os.path.exists( + acc2taxid_gz + ): + self.dead_accession2taxid_fpath = acc2taxid_gz + # Check formatting for nr database self.nr_fpath = os.path.join(self.dirpath, "nr.gz") nr_bname = os.path.splitext(os.path.basename(self.nr_fpath))[0] nr_dmnd_fname = ".".join([nr_bname, "dmnd"]) @@ -93,6 +118,7 @@ def __init__(self, dirpath, verbose=False): logger.warning( f"DatabaseWarning: {self.nr_fpath} needs to be formatted for diamond!" ) + # Setup data structures from nodes.dmp, names.dmp and merged.dmp self.nodes = self.parse_nodes() self.names = self.parse_names() self.merged = self.parse_merged() @@ -450,6 +476,102 @@ def convert_taxid_dtype(self, taxid: int) -> int: raise DatabaseOutOfSyncError(err_message) return taxid + def search_prot_accessions( + self, + accessions: set, + sseqids_to_taxids: Dict[str, int] = None, + db: str = "live", + ) -> Dict[str, int]: + """Search prot.accession2taxid.gz and dead_prot.accession2taxid.gz + + Parameters + ---------- + accessions : set + Set of subject sequence ids retrieved from diamond blastp search (sseqids) + + sseqids_to_taxids : Dict[str, int], optional + Dictionary containing sseqids converted to taxids + + db : str, optional + selection of one of the prot accession to taxid databases from NCBI. Choices are live, dead, full + + * live: prot.accession2taxid.gz + * full: prot.accession2taxid.FULL.gz + * dead: dead_prot.accession2taxid.gz + + Returns + ------- + Dict[str, int] + Dictionary containing sseqids converted to taxids + """ + if not sseqids_to_taxids: + sseqids_to_taxids = {} + if not isinstance(db, str): + raise ValueError(f"db must be a string! Type Given: {type(db)}") + db = db.lower() + choices = { + "live": self.accession2taxid_fpath, + "dead": self.dead_accession2taxid_fpath, + "full": self.accession2taxidfull_fpath, + } + if db not in choices: + raise ValueError(f"db must be one of live, full or dead. Given: {db}") + fpath = choices.get(db) + + # Revert to accession2taxid if FULL is not present + if db == "full" and (not os.path.exists(fpath) or not os.path.getsize(fpath)): + logger.warn( + "prot.accession2taxid.FULL.gz was not found. Reverting to prot.accession2taxid.gz" + ) + logger.warn( + "To achieve greater resolution of your metagenome taxonomy, considering downloading the prot.accession2taxid.FULL.gz database file" + ) + fpath = choices.get("live") + db = "live" + + if not os.path.exists(fpath) or not os.path.getsize(fpath): + raise FileNotFoundError(fpath) + + # "rt" open the database in text mode instead of binary to be handled like a text file + fh = gzip.open(fpath, "rt") if fpath.endswith(".gz") else open(fpath) + filename = os.path.basename(fpath) + # skip the header line + __ = fh.readline() + logger.debug( + f"Searching for {len(accessions):,} accessions in {filename}. This may take a while..." + ) + n_lines = file_length(fpath, approximate=True) if self.verbose else None + desc = f"Parsing {filename}" + converted_sseqid_count = 0 + for line in tqdm( + fh, disable=self.disable, desc=desc, total=n_lines, leave=False + ): + if db == "full": + # FULL format is accession.version\ttaxid\n + acc_num = None # Just in case + acc_ver, taxid = line.strip().split("\t") + else: + # dead and live formats are accession\taccession.version\ttaxid\tgi\n + acc_num, acc_ver, taxid, _ = line.strip().split("\t") + + taxid = int(taxid) + if acc_ver in accessions: + sseqids_to_taxids[acc_ver] = taxid + converted_sseqid_count += 1 + + # So prog will not have to search through the accessions set + if db == "full": + continue + + # Search for base accession if using live or dead accession2taxid databases + if acc_num in accessions: + sseqids_to_taxids[acc_num] = taxid + converted_sseqid_count += 1 + + fh.close() + logger.debug(f"sseqids converted from {filename}: {converted_sseqid_count:,}") + return sseqids_to_taxids + if __name__ == "__main__": print("file containing Autometa NCBI utilities class") diff --git a/autometa/taxonomy/vote.py b/autometa/taxonomy/vote.py index 4c5b3c7df..d8dcbed21 100644 --- a/autometa/taxonomy/vote.py +++ b/autometa/taxonomy/vote.py @@ -124,14 +124,14 @@ def call_orfs(): def blast2lca(): if "lca" not in locals(): - lca = LCA(dbdir=ncbi_dir, verbose=verbose) + lca = LCA(dbdir=ncbi_dir, verbose=verbose, cache=outdir) lca.blast2lca( orfs=prot_orfs, out=lca_fpath, blast=blast, force=force, cpus=cpus ) def majority_vote_lca(out=out): if "lca" not in locals(): - lca = LCA(dbdir=ncbi_dir, verbose=verbose) + lca = LCA(dbdir=ncbi_dir, verbose=verbose, cache=outdir) ctg_lcas = lca.parse(lca_fpath=lca_fpath, orfs_fpath=prot_orfs) votes = majority_vote.rank_taxids(ctg_lcas=ctg_lcas, ncbi=lca, verbose=verbose) out = majority_vote.write_votes(results=votes, out=out) diff --git a/modules/local/lca.nf b/modules/local/prepare_lca.nf similarity index 69% rename from modules/local/lca.nf rename to modules/local/prepare_lca.nf index ed56e8159..261bf4acd 100644 --- a/modules/local/lca.nf +++ b/modules/local/prepare_lca.nf @@ -4,9 +4,9 @@ include { initOptions; saveFiles; getSoftwareName } from './functions' params.options = [:] options = initOptions(params.options) -process LCA { - tag "Finding LCA for ${meta.id}" - label 'process_high' +process PREPARE_LCA { + tag "Preparing db cache from ${blastdb_dir}" + label 'process_medium' publishDir "${params.interim_dir_internal}", mode: params.publish_dir_mode, saveAs: { filename -> saveFiles(filename:filename, options:params.options, publish_dir:getSoftwareName(task.process), meta:[:], publish_by_meta:[]) } @@ -18,19 +18,25 @@ process LCA { container "jason-c-kwan/autometa:${params.autometa_image_tag}" } + storeDir 'db/lca' + cache 'lenient' + input: - tuple val(meta), path(blast) path(blastdb_dir) output: - tuple val(meta), path("${meta.id}.lca.tsv"), emit: lca - path '*.version.txt' , emit: version - + path "cache" , emit: cache + path '*.version.txt' , emit: version script: def software = getSoftwareName(task.process) """ - autometa-taxonomy-lca --blast ${blast} --dbdir ${blastdb_dir} --output ${meta.id}.lca.tsv + autometa-taxonomy-lca \\ + --blast . \\ + --lca-output . \\ + --dbdir ${blastdb_dir} \\ + --cache cache \\ + --only-prepare-cache echo "TODO" > autometa.version.txt """ } diff --git a/modules/local/reduce_lca.nf b/modules/local/reduce_lca.nf new file mode 100644 index 000000000..b5902b0d1 --- /dev/null +++ b/modules/local/reduce_lca.nf @@ -0,0 +1,45 @@ +// Import generic module functions +include { initOptions; saveFiles; getSoftwareName } from './functions' + +params.options = [:] +options = initOptions(params.options) + +process REDUCE_LCA { + tag "Finding LCA for ${meta.id}" + label 'process_high' + publishDir "${params.interim_dir_internal}", + mode: params.publish_dir_mode, + saveAs: { filename -> saveFiles(filename:filename, options:params.options, publish_dir:getSoftwareName(task.process), meta:[:], publish_by_meta:[]) } + + conda (params.enable_conda ? "bioconda::autometa" : null) + if (workflow.containerEngine == 'singularity' && !params.singularity_pull_docker_container) { + container "https://depot.galaxyproject.org/singularity/YOUR-TOOL-HERE" + } else { + container "jason-c-kwan/autometa:${params.autometa_image_tag}" + } + + input: + tuple val(meta), path(blast) + path(blastdb_dir) + path(lca_cache) + + output: + tuple val(meta), path("${meta.id}.lca.tsv"), emit: lca + path "${meta.id}.error_taxids.tsv" , emit: error_taxids + path "${meta.id}.sseqid2taxid.tsv" , emit: sseqid_to_taxids + path '*.version.txt' , emit: version + + + script: + def software = getSoftwareName(task.process) + """ + autometa-taxonomy-lca \\ + --blast ${blast} \\ + --dbdir ${blastdb_dir} \\ + --cache ${lca_cache} \\ + --lca-error-taxids ${meta.id}.error_taxids.tsv \\ + --sseqid2taxid-output ${meta.id}.sseqid2taxid.tsv \\ + --lca-output ${meta.id}.lca.tsv + echo "TODO" > autometa.version.txt + """ +} diff --git a/subworkflows/local/bin_contigs.nf b/subworkflows/local/bin_contigs.nf deleted file mode 100644 index 6faf76ffc..000000000 --- a/subworkflows/local/bin_contigs.nf +++ /dev/null @@ -1,105 +0,0 @@ -params.binning_options = [:] -params.unclustered_recruitment_options = [:] -params.binning_summary_options = [:] -params.taxdump_tar_gz_dir = [:] - -include { BINNING } from './../../modules/local/binning.nf' addParams( options: params.binning_options ) -include { UNCLUSTERED_RECRUITMENT } from './../../modules/local/unclustered_recruitment.nf' addParams( options: params.unclustered_recruitment_options ) -include { BINNING_SUMMARY } from './../../modules/local/binning_summary.nf' addParams( options: params.binning_summary_options, taxdump_tar_gz_dir: params.taxdump_tar_gz_dir ) - - -workflow BIN_CONTIGS { - - take: - metagenome - kmers_embedded - kmers_normalized - coverage - gc_content - markers - taxon_assignments - binning_column - - main: - kmers_embedded - .join( - coverage - ) - .join( - gc_content - ) - .join( - markers - ) - .set{metagenome_annotations} - - if (params.taxonomy_aware) { - metagenome_annotations - .join( - taxon_assignments - ) - .set{binning_ch} - } else { - metagenome_annotations - .combine( - taxon_assignments - ) - .set{binning_ch} - } - - BINNING ( - binning_ch - ) - - kmers_normalized - .join( - coverage - ).join( - BINNING.out.binning - ).join( - markers - ) - .set{coverage_binningout_markers} - - if (params.taxonomy_aware) { - coverage_binningout_markers - .join( - taxon_assignments - ) - .set{unclustered_recruitment_ch} - } else { - coverage_binningout_markers - .combine( - taxon_assignments - ) - .set{unclustered_recruitment_ch} - } - - UNCLUSTERED_RECRUITMENT ( - unclustered_recruitment_ch - ) - - BINNING.out.main - .join( - markers - ).join( - metagenome - ) - .set{binning_summary_ch} - - BINNING_SUMMARY ( - binning_summary_ch, - binning_column - ) - - emit: - binning = BINNING.out.binning - binning_main = BINNING.out.main - recruitment = UNCLUSTERED_RECRUITMENT.out.binning - recruitment_main = UNCLUSTERED_RECRUITMENT.out.main - all_binning_results = BINNING.out.binning | mix(UNCLUSTERED_RECRUITMENT.out) | collect - summary_stats = BINNING_SUMMARY.out.stats - summary_taxa = BINNING_SUMMARY.out.taxonomies - metabins = BINNING_SUMMARY.out.metabins - -} diff --git a/subworkflows/local/lca.nf b/subworkflows/local/lca.nf new file mode 100644 index 000000000..55be0b962 --- /dev/null +++ b/subworkflows/local/lca.nf @@ -0,0 +1,58 @@ +#!/usr/bin/env nextflow +nextflow.enable.dsl=2 + +params.prepare_lca_options = [:] +params.reduce_lca_options = [:] + +include { PREPARE_LCA } from './../../modules/local/prepare_lca.nf' addParams( options: params.prepare_lca_options ) +include { REDUCE_LCA } from './../../modules/local/reduce_lca.nf' addParams( options: params.reduce_lca_options ) + + +workflow LCA { + + take: + blastp_results + blastp_dbdir + + main: + PREPARE_LCA( + blastp_dbdir + ) + REDUCE_LCA( + blastp_results, + blastp_dbdir, + PREPARE_LCA.out.cache + ) + + emit: + lca = REDUCE_LCA.out.lca + error_taxid = REDUCE_LCA.out.error_taxids + sseqid_to_taxids = REDUCE_LCA.out.sseqid_to_taxids + cache = PREPARE_LCA.out.cache +} + +/* +---------------: Test Command :----------------- +nextflow run -resume $HOME/Autometa/subworkflows/local/lca.nf \\ + --interim_dir_internal nf-test \\ + --publish_dir_mode copy \\ + --input '/path/to/*.blastp.tsv' \\ + --dbdir '/path/to/ncbi/databases/directory' +*/ + +workflow { + blastp_results_ch = Channel + .fromPath(params.input) + .map { row -> + def meta = [:] + meta.id = row.simpleName + return [ meta, row ] + } + + dbdir = Channel.value(params.dbdir) + + LCA( + blastp_results_ch, + dbdir + ) +} diff --git a/subworkflows/local/taxon_assignment.nf b/subworkflows/local/taxon_assignment.nf index dcc5daf35..3a3684eb6 100644 --- a/subworkflows/local/taxon_assignment.nf +++ b/subworkflows/local/taxon_assignment.nf @@ -1,4 +1,5 @@ -params.lca_options = [:] +params.prepare_lca_options = [:] +params.reduce_lca_options = [:] params.majority_vote_options = [:] params.split_kingdoms_options = [:] params.nr_dmnd_dir = [:] @@ -13,7 +14,7 @@ params.large_downloads_permission = [:] include { PREPARE_NR_DB } from './prepare_nr.nf' addParams( debug: params.debug, diamond_makedb_options: params.diamond_makedb_options, nr_dmnd_dir: params.nr_dmnd_dir ) include { PREPARE_TAXONOMY_DATABASES } from './prepare_ncbi_taxinfo.nf' addParams( debug: params.debug, taxdump_tar_gz_dir: params.taxdump_tar_gz_dir, prot_accession2taxid_gz_dir: params.prot_accession2taxid_gz_dir ) -include { LCA } from './../../modules/local/lca.nf' addParams( options: params.lca_options ) +include { LCA } from './lca.nf' addParams( prepare_lca_options: params.prepare_lca_options, reduce_lca_options: params.reduce_lca_options ) include { MAJORITY_VOTE } from './../../modules/local/majority_vote.nf' addParams( options: params.majority_vote_options ) include { SPLIT_KINGDOMS } from './../../modules/local/split_kingdoms.nf' addParams( options: params.split_kingdoms_options ) include { DIAMOND_BLASTP } from './../../modules/local/diamond_blastp.nf' addParams( options: params.diamond_blastp_options ) diff --git a/workflows/autometa.sh b/workflows/autometa.sh index 2a875237d..c78796655 100644 --- a/workflows/autometa.sh +++ b/workflows/autometa.sh @@ -109,9 +109,16 @@ done # output: lca="${outdir}/${simpleName}.orfs.lca.tsv" +sseqid2taxid="${outdir}/${simpleName}.orfs.sseqid2taxid.tsv" +errorTaxids="${outdir}/${simpleName}.orfs.errortaxids.tsv" # script: -autometa-taxonomy-lca --blast $blast --dbdir $ncbi --output $lca +autometa-taxonomy-lca \ + --blast $blast \ + --dbdir $ncbi \ + --lca-output $lca \ + --sseqid2taxid-output $sseqid2taxid \ + --lca-error-taxids $errorTaxids # Step 4.2: Perform Modified Majority vote of ORF LCAs for all contigs that returned hits in blast search