Skip to content

Commit

Permalink
fixed env arg
Browse files Browse the repository at this point in the history
  • Loading branch information
nickp60 committed Oct 29, 2019
1 parent 9f5e5d5 commit a241e02
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 56 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ calculate-shannon-entropy calculate-shannon-entropy.py -i aligned_ecolidb.mafft.
[--example_reads]: Input of user-given reads.
[--subassembler]: Choice of mash or skesa for subassembly in riboSeed.
```
### Running on an HPC with SGE
Python's multiprocessing does not play well with SGE: to run efficiently, use `--sge` mode, provide the conda environment name with `--sge_env`. FocusDB will write out a bash script for all the assemblies after processing all the reads. `qsub` the script, and when it finishes, re-run focusDB with the same parameters, and it will finish processing the data


### Included Utilities:
#### `combine-focusdb-and-silva`
Expand Down
2 changes: 1 addition & 1 deletion py16db/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.3.55"
__version__ = "0.3.57"
132 changes: 77 additions & 55 deletions py16db/run_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,6 @@ def get_args(): # pragma: nocover
"-h", "--help",
action="help", default=argparse.SUPPRESS,
help="Displays this help message")
mainargs.add_argument(
"--sge", action="store_true",
help="how to handle resources. In --sge mode a script called " +
"`run_assemblies.sh` is written in the output dir, rather than " +
"dispatching the assemblies with multiprocessing. If running in " +
"--sge mode, your workflow would be focusDB -> qsub " +
"run_assemblies.sh -> focusDB to parse the results. " +
" Future versions might do this automatically",
required=False)
parargs.add_argument(
"-S", "--n_SRAs", help="max number of SRAs to be run",
type=int, required=False)
Expand Down Expand Up @@ -224,6 +215,7 @@ def get_args(): # pragma: nocover
"If desired, this can be relaxed",
default=.05,
type=float)

jobargs.add_argument(
"--njobs",
help="how many jobs to run concurrently " +
Expand All @@ -249,6 +241,51 @@ def get_args(): # pragma: nocover
"default 1800s (30 mins)",
default=1800,
required=False, type=int)
jobargs.add_argument(
"--threads",
action="store",
default=1, type=int,
choices=[1, 2, 4],
help="if your cores are hyperthreaded, set number" +
" threads to the number of threads per processer." +
"If unsure, see 'cat /proc/cpuinfo' under 'cpu " +
"cores', or 'lscpu' under 'Thread(s) per core'." +
": %(default)s")
jobargs.add_argument(
"--fastqtool",
help="either fastq-dump or fasterq-dump",
default="fasterq-dump",
choices=["fastq-dump", "fasterq-dump"],
required=False)
jobargs.add_argument(
"--subassembler",
help="which program should riboseed " +
"use for sub assemblies",
choices=["spades", "skesa"],
required=False, default="spades")
jobargs.add_argument(
"--sge", action="store_true",
help="how to handle resources. In --sge mode a script called " +
"`run_assemblies.sh` is written in the output dir, rather than " +
"dispatching the assemblies with multiprocessing. If running in " +
"--sge mode, your workflow would be focusDB -> qsub " +
"run_assemblies.sh -> focusDB to parse the results. " +
" Future versions might do this automatically",
required=False)
jobargs.add_argument(
"--sge_env", action="store",
help="conda env to run riboSeed assermblies in.",
required="--sge" in sys.argv)
jobargs.add_argument(
"-v", "--verbosity", dest='verbosity',
action="store",
default=2, type=int, choices=[1, 2, 3, 4, 5],
help="Logger writes debug to file in output dir; " +
"this sets verbosity level sent to stderr. " +
" 1 = debug(), 2 = info(), 3 = warning(), " +
"4 = error() and 5 = critical(); " +
"default: %(default)s")

expargs.add_argument(
"--process_partial",
help="If fastq-dump (NOT fasterq-dump) times out, " +
Expand All @@ -266,16 +303,6 @@ def get_args(): # pragma: nocover
help="If a partial download is encountered during " +
"this run, delete and attempt to re-download",
required=False, action="store_true")
jobargs.add_argument(
"--threads",
action="store",
default=1, type=int,
choices=[1, 2, 4],
help="if your cores are hyperthreaded, set number" +
" threads to the number of threads per processer." +
"If unsure, see 'cat /proc/cpuinfo' under 'cpu " +
"cores', or 'lscpu' under 'Thread(s) per core'." +
": %(default)s")
parargs.add_argument(
"--maxcov",
help="integer for maximum desired read depth" +
Expand All @@ -287,12 +314,6 @@ def get_args(): # pragma: nocover
help="integer for minimum read depth",
default=15,
required=False, type=int)
jobargs.add_argument(
"--fastqtool",
help="either fastq-dump or fasterq-dump",
default="fasterq-dump",
choices=["fastq-dump", "fasterq-dump"],
required=False)
expargs.add_argument(
"--custom_reads",
help="input of custom reads", nargs='+',
Expand All @@ -304,12 +325,6 @@ def get_args(): # pragma: nocover
expargs.add_argument(
"--redo_assembly", action="store_true",
help="redo the assembly step, ignoring status file")
jobargs.add_argument(
"--subassembler",
help="which program should riboseed " +
"use for sub assemblies",
choices=["spades", "skesa"],
required=False, default="spades")
# this is needed for plentyofbugs, should not be user set
hiddenargs.add_argument(
"--nstrains", help=argparse.SUPPRESS,
Expand All @@ -322,18 +337,14 @@ def get_args(): # pragma: nocover
"--use_available", action="store_true",
help="just use any applicable SRAs already downloaded. " +
"This can be useful after sraFind updates, and random " +
"seeding tries to pull other genomes",
)
jobargs.add_argument(
"-v", "--verbosity", dest='verbosity',
action="store",
default=2, type=int, choices=[1, 2, 3, 4, 5],
help="Logger writes debug to file in output dir; " +
"this sets verbosity level sent to stderr. " +
" 1 = debug(), 2 = info(), 3 = warning(), " +
"4 = error() and 5 = critical(); " +
"default: %(default)s")
"seeding tries to pull other genomes")

args = parser.parse_args()
if args.sge:
# just to be safe
if not args.sge_env.isalnum():
print("invalid conda env %s" % args.sge_env)
sys.exit(1)
args.organism_name = args.organism_name.strip()
args.output_dir = args.output_dir.strip()
if args.output_dir.count(" ") > 0:
Expand Down Expand Up @@ -433,6 +444,7 @@ def pob(genomes_dir, readsf, output_dir, maxdist, logger):
Uses plentyofbugs, a package that useqs mash to
find the best reference genome for draft genome
"""

pobcmd = str("plentyofbugs -g {genomes_dir} -f {readsf} -o {output_dir} " +
"--downsampling_ammount 1000000").format(**locals())
logger.info('Finding best reference genome: %s', pobcmd)
Expand Down Expand Up @@ -803,6 +815,8 @@ def process_strain(rawreadsf, rawreadsr, read_length, genomes_dir,
# do we want to redo the assembly?
if args.redo_assembly:
update_status_file(status_file, to_remove=["RIBOSEED COMPLETE"])
if os.path.exists(ribo_dir):
shutil.rmtree(ribo_dir)
# file that will contain riboseed contigs
if args.just_seed:
ribo_contigs = os.path.join(
Expand All @@ -813,9 +827,14 @@ def process_strain(rawreadsf, rawreadsr, read_length, genomes_dir,
this_output, "riboSeed", "seed",
"final_de_fere_novo_assembly", "contigs.fasta")
if "RIBOSEED COMPLETE" not in parse_status_file(status_file):
if os.path.exists(ribo_dir):
shutil.rmtree(ribo_dir)
return(riboseed_cmd, ribo_contigs, tax_dict)
if os.path.exists(ribo_contigs):
# after sge run
update_status_file(status_file, message="RIBOSEED COMPLETE")
return (None, ribo_contigs, tax_dict)
else:
if os.path.exists(ribo_dir):
shutil.rmtree(ribo_dir)
return(riboseed_cmd, ribo_contigs, tax_dict)
else:
logger.info("Skipping riboSeed")
return (None, ribo_contigs, tax_dict)
Expand Down Expand Up @@ -1042,15 +1061,18 @@ def add_key_or_increment(d, k):
def write_sge_script(args, ntorun, riboSeed_jobs, script_path):
end_message = "Done running assemblies. Rerun focusDB as before to " + \
"will detect the assemblies and finish processing them. Exiting.."
header_lines = ["#!/bin/bash",
"#$ -t 1-%i" % ntorun,
"#$ -tc %i" % args.njobs,
"#$ -cwd",
"#$ -j yes",
"#$ -N focusDB_assembs",
"#$ -pe mpi %i" % args.cores,
"#$ -l h_vmem=%iG" % args.memory,
"set -e"]
header_lines = [
"#!/bin/bash",
"#$ -t 1-%i" % ntorun,
"#$ -tc %i" % args.njobs,
"#$ -cwd",
"#$ -j yes",
"#$ -N focusDB_assembs",
"#$ -pe mpi %i" % args.cores,
"#$ -l h_vmem=%iG" % args.memory,
"set -e",
"conda activate %s" % args.sge_env
]
with open(script_path, "w") as outf:
for l in header_lines:
outf.write(l + "\n")
Expand Down Expand Up @@ -1348,7 +1370,7 @@ def main():
sys.exit()
# else
logger.info("Processing %i riboSeed runs; this can take a while",
len(riboSeed_jobs))
n_assemblies_to_run)

pool = multiprocessing.Pool(processes=args.njobs)
logger.debug("running the following commands:")
Expand Down

0 comments on commit a241e02

Please sign in to comment.