diff --git a/README.md b/README.md index 3c98486..c0bc7b1 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,9 @@ calculate-shannon-entropy calculate-shannon-entropy.py -i aligned_ecolidb.mafft. [--example_reads]: Input of user-given reads. [--subassembler]: Choice of mash or skesa for subassembly in riboSeed. ``` +### Running on an HPC with SGE +Python's multiprocessing does not play well with SGE: to run efficiently, use `--sge` mode, provide the conda environment name with `--sge_env`. FocusDB will write out a bash script for all the assemblies after processing all the reads. `qsub` the script, and when it finishes, re-run focusDB with the same parameters, and it will finish processing the data + ### Included Utilities: #### `combine-focusdb-and-silva` diff --git a/py16db/_version.py b/py16db/_version.py index fefcfeb..0ecedfb 100644 --- a/py16db/_version.py +++ b/py16db/_version.py @@ -1 +1 @@ -__version__ = "0.3.55" +__version__ = "0.3.57" diff --git a/py16db/run_all.py b/py16db/run_all.py index fc337b5..8a1af11 100644 --- a/py16db/run_all.py +++ b/py16db/run_all.py @@ -133,15 +133,6 @@ def get_args(): # pragma: nocover "-h", "--help", action="help", default=argparse.SUPPRESS, help="Displays this help message") - mainargs.add_argument( - "--sge", action="store_true", - help="how to handle resources. In --sge mode a script called " + - "`run_assemblies.sh` is written in the output dir, rather than " + - "dispatching the assemblies with multiprocessing. If running in " + - "--sge mode, your workflow would be focusDB -> qsub " + - "run_assemblies.sh -> focusDB to parse the results. " + - " Future versions might do this automatically", - required=False) parargs.add_argument( "-S", "--n_SRAs", help="max number of SRAs to be run", type=int, required=False) @@ -224,6 +215,7 @@ def get_args(): # pragma: nocover "If desired, this can be relaxed", default=.05, type=float) + jobargs.add_argument( "--njobs", help="how many jobs to run concurrently " + @@ -249,6 +241,51 @@ def get_args(): # pragma: nocover "default 1800s (30 mins)", default=1800, required=False, type=int) + jobargs.add_argument( + "--threads", + action="store", + default=1, type=int, + choices=[1, 2, 4], + help="if your cores are hyperthreaded, set number" + + " threads to the number of threads per processer." + + "If unsure, see 'cat /proc/cpuinfo' under 'cpu " + + "cores', or 'lscpu' under 'Thread(s) per core'." + + ": %(default)s") + jobargs.add_argument( + "--fastqtool", + help="either fastq-dump or fasterq-dump", + default="fasterq-dump", + choices=["fastq-dump", "fasterq-dump"], + required=False) + jobargs.add_argument( + "--subassembler", + help="which program should riboseed " + + "use for sub assemblies", + choices=["spades", "skesa"], + required=False, default="spades") + jobargs.add_argument( + "--sge", action="store_true", + help="how to handle resources. In --sge mode a script called " + + "`run_assemblies.sh` is written in the output dir, rather than " + + "dispatching the assemblies with multiprocessing. If running in " + + "--sge mode, your workflow would be focusDB -> qsub " + + "run_assemblies.sh -> focusDB to parse the results. " + + " Future versions might do this automatically", + required=False) + jobargs.add_argument( + "--sge_env", action="store", + help="conda env to run riboSeed assermblies in.", + required="--sge" in sys.argv) + jobargs.add_argument( + "-v", "--verbosity", dest='verbosity', + action="store", + default=2, type=int, choices=[1, 2, 3, 4, 5], + help="Logger writes debug to file in output dir; " + + "this sets verbosity level sent to stderr. " + + " 1 = debug(), 2 = info(), 3 = warning(), " + + "4 = error() and 5 = critical(); " + + "default: %(default)s") + expargs.add_argument( "--process_partial", help="If fastq-dump (NOT fasterq-dump) times out, " + @@ -266,16 +303,6 @@ def get_args(): # pragma: nocover help="If a partial download is encountered during " + "this run, delete and attempt to re-download", required=False, action="store_true") - jobargs.add_argument( - "--threads", - action="store", - default=1, type=int, - choices=[1, 2, 4], - help="if your cores are hyperthreaded, set number" + - " threads to the number of threads per processer." + - "If unsure, see 'cat /proc/cpuinfo' under 'cpu " + - "cores', or 'lscpu' under 'Thread(s) per core'." + - ": %(default)s") parargs.add_argument( "--maxcov", help="integer for maximum desired read depth" + @@ -287,12 +314,6 @@ def get_args(): # pragma: nocover help="integer for minimum read depth", default=15, required=False, type=int) - jobargs.add_argument( - "--fastqtool", - help="either fastq-dump or fasterq-dump", - default="fasterq-dump", - choices=["fastq-dump", "fasterq-dump"], - required=False) expargs.add_argument( "--custom_reads", help="input of custom reads", nargs='+', @@ -304,12 +325,6 @@ def get_args(): # pragma: nocover expargs.add_argument( "--redo_assembly", action="store_true", help="redo the assembly step, ignoring status file") - jobargs.add_argument( - "--subassembler", - help="which program should riboseed " + - "use for sub assemblies", - choices=["spades", "skesa"], - required=False, default="spades") # this is needed for plentyofbugs, should not be user set hiddenargs.add_argument( "--nstrains", help=argparse.SUPPRESS, @@ -322,18 +337,14 @@ def get_args(): # pragma: nocover "--use_available", action="store_true", help="just use any applicable SRAs already downloaded. " + "This can be useful after sraFind updates, and random " + - "seeding tries to pull other genomes", - ) - jobargs.add_argument( - "-v", "--verbosity", dest='verbosity', - action="store", - default=2, type=int, choices=[1, 2, 3, 4, 5], - help="Logger writes debug to file in output dir; " + - "this sets verbosity level sent to stderr. " + - " 1 = debug(), 2 = info(), 3 = warning(), " + - "4 = error() and 5 = critical(); " + - "default: %(default)s") + "seeding tries to pull other genomes") + args = parser.parse_args() + if args.sge: + # just to be safe + if not args.sge_env.isalnum(): + print("invalid conda env %s" % args.sge_env) + sys.exit(1) args.organism_name = args.organism_name.strip() args.output_dir = args.output_dir.strip() if args.output_dir.count(" ") > 0: @@ -433,6 +444,7 @@ def pob(genomes_dir, readsf, output_dir, maxdist, logger): Uses plentyofbugs, a package that useqs mash to find the best reference genome for draft genome """ + pobcmd = str("plentyofbugs -g {genomes_dir} -f {readsf} -o {output_dir} " + "--downsampling_ammount 1000000").format(**locals()) logger.info('Finding best reference genome: %s', pobcmd) @@ -803,6 +815,8 @@ def process_strain(rawreadsf, rawreadsr, read_length, genomes_dir, # do we want to redo the assembly? if args.redo_assembly: update_status_file(status_file, to_remove=["RIBOSEED COMPLETE"]) + if os.path.exists(ribo_dir): + shutil.rmtree(ribo_dir) # file that will contain riboseed contigs if args.just_seed: ribo_contigs = os.path.join( @@ -813,9 +827,14 @@ def process_strain(rawreadsf, rawreadsr, read_length, genomes_dir, this_output, "riboSeed", "seed", "final_de_fere_novo_assembly", "contigs.fasta") if "RIBOSEED COMPLETE" not in parse_status_file(status_file): - if os.path.exists(ribo_dir): - shutil.rmtree(ribo_dir) - return(riboseed_cmd, ribo_contigs, tax_dict) + if os.path.exists(ribo_contigs): + # after sge run + update_status_file(status_file, message="RIBOSEED COMPLETE") + return (None, ribo_contigs, tax_dict) + else: + if os.path.exists(ribo_dir): + shutil.rmtree(ribo_dir) + return(riboseed_cmd, ribo_contigs, tax_dict) else: logger.info("Skipping riboSeed") return (None, ribo_contigs, tax_dict) @@ -1042,15 +1061,18 @@ def add_key_or_increment(d, k): def write_sge_script(args, ntorun, riboSeed_jobs, script_path): end_message = "Done running assemblies. Rerun focusDB as before to " + \ "will detect the assemblies and finish processing them. Exiting.." - header_lines = ["#!/bin/bash", - "#$ -t 1-%i" % ntorun, - "#$ -tc %i" % args.njobs, - "#$ -cwd", - "#$ -j yes", - "#$ -N focusDB_assembs", - "#$ -pe mpi %i" % args.cores, - "#$ -l h_vmem=%iG" % args.memory, - "set -e"] + header_lines = [ + "#!/bin/bash", + "#$ -t 1-%i" % ntorun, + "#$ -tc %i" % args.njobs, + "#$ -cwd", + "#$ -j yes", + "#$ -N focusDB_assembs", + "#$ -pe mpi %i" % args.cores, + "#$ -l h_vmem=%iG" % args.memory, + "set -e", + "conda activate %s" % args.sge_env + ] with open(script_path, "w") as outf: for l in header_lines: outf.write(l + "\n") @@ -1348,7 +1370,7 @@ def main(): sys.exit() # else logger.info("Processing %i riboSeed runs; this can take a while", - len(riboSeed_jobs)) + n_assemblies_to_run) pool = multiprocessing.Pool(processes=args.njobs) logger.debug("running the following commands:")