From 7947f7a06e2a06c687f939e18f98dca51371d2bf Mon Sep 17 00:00:00 2001 From: Chris Tomkins-Tinch Date: Tue, 27 Nov 2018 14:09:50 -0500 Subject: [PATCH] respec snakemake per-rule mem requirement: s/mem/mem_mb/g (#897) * respec snakemake per-rule mem requirement: s/mem/mem_mb/g mem_mb is now the preferred way to specify rule-specific memory; this param is recognized by certain execution engines such as kubernetes. See: https://snakemake.readthedocs.io/en/v5.3.0/executable.html#kubernetes * add run-pipe_local.sh; scale spades mem to 90% of requested per-job * increase spades memory * correct run-pipe_local.sh script description comment --- easy-deploy-script/easy-deploy-viral-ngs.sh | 22 ++++++--- pipes/Broad_LSF/cluster-submitter.py | 2 +- pipes/Broad_LSF/run-pipe.sh | 2 +- pipes/Broad_UGER/cluster-submitter.py | 2 +- pipes/Broad_UGER/run-pipe.sh | 2 +- pipes/rules/assembly.rules | 22 ++++----- pipes/rules/demux.rules | 4 +- pipes/rules/hs_deplete.rules | 13 +++--- pipes/rules/interhost.rules | 12 ++--- pipes/rules/intrahost.rules | 6 +-- pipes/rules/metagenomics.rules | 14 +++--- pipes/rules/ncbi.rules | 2 +- pipes/rules/reports.rules | 4 +- pipes/run-pipe_local.sh | 50 +++++++++++++++++++++ 14 files changed, 111 insertions(+), 46 deletions(-) create mode 100644 pipes/run-pipe_local.sh diff --git a/easy-deploy-script/easy-deploy-viral-ngs.sh b/easy-deploy-script/easy-deploy-viral-ngs.sh index 08c867f98..735db04ff 100755 --- a/easy-deploy-script/easy-deploy-viral-ngs.sh +++ b/easy-deploy-script/easy-deploy-viral-ngs.sh @@ -351,11 +351,23 @@ function create_project(){ fi if [ -z "$OMIT_UGER_PROJECT_FILES" ]; then - if [ ! -L "$PROJECT_PATH/run-pipe_UGER.sh" ]; then - ln -s "$VIRAL_NGS_PATH/pipes/Broad_UGER/run-pipe.sh" "$PROJECT_PATH/run-pipe_UGER.sh" - fi - if [ ! -L "$PROJECT_PATH/run-pipe.sh" ]; then - ln -s "run-pipe_UGER.sh" "$PROJECT_PATH/run-pipe.sh" + # environment var JOB_ID is defined on UGER + # if we are not running this on UGER, assume local execution + if [ -z "$JOB_ID" ]; then + if [ ! -L "$PROJECT_PATH/run-pipe_local.sh" ]; then + ln -s "$VIRAL_NGS_PATH/pipes/run-pipe.sh" "$PROJECT_PATH/run-pipe_local.sh" + fi + if [ ! -L "$PROJECT_PATH/run-pipe.sh" ]; then + ln -s "$PROJECT_PATH/run-pipe_local.sh" "$PROJECT_PATH/run-pipe.sh" + fi + # if we ARE running this on UGER, link in the UGER run-pipe script + else + if [ ! -L "$PROJECT_PATH/run-pipe_UGER.sh" ]; then + ln -s "$VIRAL_NGS_PATH/pipes/Broad_UGER/run-pipe.sh" "$PROJECT_PATH/run-pipe_UGER.sh" + fi + if [ ! -L "$PROJECT_PATH/run-pipe.sh" ]; then + ln -s "run-pipe_UGER.sh" "$PROJECT_PATH/run-pipe.sh" + fi fi fi diff --git a/pipes/Broad_LSF/cluster-submitter.py b/pipes/Broad_LSF/cluster-submitter.py index be5fd4f5c..03237c452 100755 --- a/pipes/Broad_LSF/cluster-submitter.py +++ b/pipes/Broad_LSF/cluster-submitter.py @@ -26,7 +26,7 @@ cmdline += "-oo {logdir}/LSF-{jobname}.txt ".format(logdir=LOGDIR, jobname=jobname) # pass memory resource request to LSF -mem = props.get('resources', {}).get('mem') +mem = int(props.get('resources', {}).get('mem_mb'))/1000 if mem: cmdline += '-R "rusage[mem={}]" -M {} '.format(mem, 2 * int(mem)) diff --git a/pipes/Broad_LSF/run-pipe.sh b/pipes/Broad_LSF/run-pipe.sh index 14cd9b107..9571c75d4 100755 --- a/pipes/Broad_LSF/run-pipe.sh +++ b/pipes/Broad_LSF/run-pipe.sh @@ -22,7 +22,7 @@ export PATH="$MINICONDADIR/bin:$PATH" source activate "$CONDAENVDIR" # invoke Snakemake in cluster mode with custom wrapper scripts -snakemake --timestamp --rerun-incomplete --keep-going --nolock \ +snakemake --rerun-incomplete --keep-going --nolock \ --jobs 100000 --immediate-submit \ --latency-wait 20 \ --config mode=LSF job_profiler="$BINDIR/pipes/Broad_LSF/lsf-report.py" \ diff --git a/pipes/Broad_UGER/cluster-submitter.py b/pipes/Broad_UGER/cluster-submitter.py index 70b56303b..eb9084ffa 100755 --- a/pipes/Broad_UGER/cluster-submitter.py +++ b/pipes/Broad_UGER/cluster-submitter.py @@ -37,7 +37,7 @@ def hard_blacklist_node(node): cmdline += "-o {logdir} -j y ".format(logdir=LOGDIR) # pass memory resource request to cluster -mem = props.get('resources', {}).get('mem') +mem = int(props.get('resources', {}).get('mem_mb'))/1000 threads = props.get('resources', {}).get('threads') threads = threads or 1 if mem: diff --git a/pipes/Broad_UGER/run-pipe.sh b/pipes/Broad_UGER/run-pipe.sh index a0c617f07..8fe0360ad 100755 --- a/pipes/Broad_UGER/run-pipe.sh +++ b/pipes/Broad_UGER/run-pipe.sh @@ -41,7 +41,7 @@ source activate "$CONDAENVDIR" ARGS="" [[ $IMMEDIATE_SUBMIT -eq 1 ]] && ARGS+=" --immediate-submit --notemp " # invoke Snakemake in cluster mode with custom wrapper scripts -snakemake --timestamp --rerun-incomplete --keep-going --nolock \ +snakemake --rerun-incomplete --keep-going --nolock \ $ARGS \ --jobs 90 \ --force-use-threads \ diff --git a/pipes/rules/assembly.rules b/pipes/rules/assembly.rules index f4810258c..4c08db58c 100644 --- a/pipes/rules/assembly.rules +++ b/pipes/rules/assembly.rules @@ -43,7 +43,7 @@ rule assemble_trinity: fasta = config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly1-trinity.fasta', subsamp_bam = config["tmp_dir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.subsamp.bam' resources: - mem = 7, + mem_mb = 7*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -63,12 +63,14 @@ rule assemble_spades: clipDb = objectify_remote(config["trim_clip_db"]) output: contigs_spades=config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly1-spades.fasta' resources: - mem=12, + mem_mb=18*1000, threads=int(config.get("number_of_threads", 1)) params: n_reads=str(config["spades_n_reads"]), logid="{sample}" run: - shell("{config[bin_dir]}/assembly.py assemble_spades {input.taxfilt_reads} {input.clipDb} {output.contigs_spades} --nReads {params.n_reads} --threads {resources.threads} --memLimitGb {resources.mem}") + # give spades a bit less than the rule specifies + mem = int(resources.mem_mb * 0.90) + shell("{config[bin_dir]}/assembly.py assemble_spades {input.taxfilt_reads} {input.clipDb} {output.contigs_spades} --nReads {params.n_reads} --threads {resources.threads} --memLimitGb {mem}") rule assemble_trinity_spades: @@ -79,12 +81,12 @@ rule assemble_trinity_spades: contigs_trinity=config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly1-trinity.fasta' output: contigs_trinity_spades=config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly1-trinity-spades.fasta' resources: - mem=12, + mem_mb=12*1000, threads=int(config.get("number_of_threads", 1)) params: n_reads=str(config["spades_n_reads"]), logid="{sample}" run: - shell("{config[bin_dir]}/assembly.py assemble_spades {input.taxfilt_reads} {input.clipDb} {output.contigs_trinity_spades} --contigsUntrusted {input.contigs_trinity} --nReads {params.n_reads} --threads {resources.threads} --memLimitGb {resources.mem}") + shell("{config[bin_dir]}/assembly.py assemble_spades {input.taxfilt_reads} {input.clipDb} {output.contigs_trinity_spades} --contigsUntrusted {input.contigs_trinity} --nReads {params.n_reads} --threads {resources.threads} --memLimitGb {resources.mem_mb}") rule orient_and_impute: @@ -114,7 +116,7 @@ rule orient_and_impute: output: fasta = config["tmp_dir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.assembly3-modify.fasta' resources: - mem = 12, + mem_mb = 12*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -143,7 +145,7 @@ rule orient_and_impute: ono_extra = " ".join(ono_extra) n_genome_segments = len(config["accessions_for_ref_genome_build"]) shell("{config[bin_dir]}/assembly.py order_and_orient {input.fasta} {input.refsel_genomes} {params.scaffolded_fasta} {ono_extra} --outAlternateContigs {params.alternate_fasta} --nGenomeSegments {n_genome_segments} --outReference {params.scaffold_ref} --threads {resources.threads}") - shell("{config[bin_dir]}/assembly.py gapfill_gap2seq {params.scaffolded_fasta} {input.cleaned_reads} {params.gapfilled_fasta} --memLimitGb {resources[mem]} --maskErrors --randomSeed {config[random_seed]}") + shell("{config[bin_dir]}/assembly.py gapfill_gap2seq {params.scaffolded_fasta} {input.cleaned_reads} {params.gapfilled_fasta} --memLimitGb {resources[mem_mb]} --maskErrors --randomSeed {config[random_seed]}") shell("{config[bin_dir]}/assembly.py impute_from_reference {params.gapfilled_fasta} {params.scaffold_ref} {output.fasta} --newName {params.renamed_prefix}{wildcards.sample} --replaceLength {params.replace_length} --minLengthFraction {params.length} --minUnambig {params.min_unambig} --index") rule refine_assembly_1: @@ -165,7 +167,7 @@ rule refine_assembly_1: config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly4-refined.fasta', config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly3.vcf.gz' resources: - mem = 7, + mem_mb = 7*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -191,7 +193,7 @@ rule refine_assembly_2: config["data_dir"]+'/'+config["subdirs"]["assembly"]+'/{sample}.fasta', config["tmp_dir"] +'/'+config["subdirs"]["assembly"]+'/{sample}.assembly4.vcf.gz' resources: - mem = 7, + mem_mb = 7*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -216,7 +218,7 @@ rule map_reads_to_self: config["data_dir"]+'/'+config["subdirs"]["align_self"]+'/{sample}.bam', config["data_dir"]+'/'+config["subdirs"]["align_self"]+'/{sample}.mapped.bam' resources: - mem = 4, + mem_mb = 4*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), diff --git a/pipes/rules/demux.rules b/pipes/rules/demux.rules index 11b4dc562..8a3931f73 100644 --- a/pipes/rules/demux.rules +++ b/pipes/rules/demux.rules @@ -83,7 +83,7 @@ rule illumina_demux: config['reports_dir']+'/barcodes/barcodes-metrics-{flowcell}.{lane}.txt', config['reports_dir']+'/barcodes/common-barcodes-{flowcell}.{lane}.txt' resources: - mem = 8, + mem_mb = 8*1000, threads = 16 params: LSF = config.get('LSF_queues', {}).get('bigmem', '-q flower'), @@ -95,7 +95,7 @@ rule illumina_demux: shutil.rmtree(outdir, ignore_errors=True) makedirs(set(map(os.path.dirname, output))) lane = get_one_lane_from_run(wildcards.flowcell, wildcards.lane, config['seqruns_demux']) - opts = '--threads={} --JVMmemory={}g'.format(resources.threads, resources.mem) + opts = '--threads={} --JVMmemory={}g'.format(resources.threads, resources.mem_mb) for opt in ('minimum_base_quality', 'max_mismatches', 'min_mismatch_delta', 'max_no_calls', 'read_structure', 'minimum_quality', 'run_start_date'): if lane.get(opt): opts += ' --%s=%s' % (opt, lane[opt]) diff --git a/pipes/rules/hs_deplete.rules b/pipes/rules/hs_deplete.rules index 1f7f6adf2..04f67e7e7 100644 --- a/pipes/rules/hs_deplete.rules +++ b/pipes/rules/hs_deplete.rules @@ -34,7 +34,7 @@ rule depletion: config["tmp_dir"] +'/'+config["subdirs"]["depletion"]+'/{sample}.rmdup.bam', config["data_dir"]+'/'+config["subdirs"]["depletion"]+'/{sample}.cleaned.bam' resources: - mem = 15, + mem_mb = 15*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('long', '-q forest'), @@ -45,11 +45,11 @@ rule depletion: makedirs(expand("{dir}/{subdir}", dir=[config["data_dir"],config["tmp_dir"]], subdir=config["subdirs"]["depletion"])) - mem_mb = int((resources.mem * 1000) * 0.95) + mem_mb = int(resources.mem_mb * 0.95) blast_db_prefixes=" ".join(set([strip_protocol(dbf, relative=True) for dbf in config["blast_db_remove"]])) bmtagger_db_prefixes=" ".join(set([strip_protocol(dbf, relative=True) for dbf in config["bmtagger_dbs_remove"]])) bwa_db_prefixes=" ".join(set([strip_protocol(dbf, relative=True) for dbf in config["bwa_dbs_remove"]])) - shell("{config[bin_dir]}/taxon_filter.py deplete {input.input_bam} {params.revert_bam} {output} --bwaDbs {bwa_db_prefixes} --bmtaggerDbs {bmtagger_db_prefixes} --blastDbs {blast_db_prefixes} --threads {resources.threads} --srprismMemory {mem_mb} --JVMmemory 15g") + shell("{config[bin_dir]}/taxon_filter.py deplete {input.input_bam} {params.revert_bam} {output} --bwaDbs {bwa_db_prefixes} --bmtaggerDbs {bmtagger_db_prefixes} --blastDbs {blast_db_prefixes} --threads {resources.threads} --srprismMemory {mem_mb} --JVMmemory {resources.mem_mb}m") os.unlink(params.revert_bam) rule filter_to_taxon: @@ -62,7 +62,7 @@ rule filter_to_taxon: output: config["data_dir"]+'/'+config["subdirs"]["depletion"]+'/{sample}.taxfilt.bam' resources: - mem = 7 + mem_mb = 7*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=2:00:00'), @@ -122,16 +122,17 @@ rule merge_one_per_sample: output: config["data_dir"]+'/'+config["subdirs"]["per_sample"] +'/{sample}.{adjective,raw|cleaned|taxfilt}.bam' resources: - mem = 10 + mem_mb = 10*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = merge_one_per_sample_queue, logid = "{sample}-{adjective}", tmpf_bam = config["tmp_dir"]+'/'+config["subdirs"]["depletion"] +'/{sample}.{adjective}.bam' run: + mem_mb = int(resources.mem_mb * 0.90) makedirs(config["data_dir"]+'/'+config["subdirs"]["per_sample"]) if wildcards.adjective == 'raw': shell("{config[bin_dir]}/read_utils.py merge_bams {input} {output} --picardOptions SORT_ORDER=queryname") else: shell("{config[bin_dir]}/read_utils.py merge_bams {input} {params.tmpf_bam} --picardOptions SORT_ORDER=queryname") - shell("{config[bin_dir]}/read_utils.py rmdup_mvicuna_bam {params.tmpf_bam} {output} --JVMmemory 8g") + shell("{config[bin_dir]}/read_utils.py rmdup_mvicuna_bam {params.tmpf_bam} {output} --JVMmemory {mem_mb}m") diff --git a/pipes/rules/interhost.rules b/pipes/rules/interhost.rules index 0a8e0a1fe..e51e2d982 100644 --- a/pipes/rules/interhost.rules +++ b/pipes/rules/interhost.rules @@ -40,7 +40,7 @@ rule ref_guided_consensus: config["data_dir"]+'/'+config["subdirs"]["align_ref"]+'/{sample}.vcf.gz', config["data_dir"]+'/'+config["subdirs"]["align_ref"]+'/{sample}.fasta' resources: - mem = 4, + mem_mb = 4*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -60,7 +60,7 @@ rule ref_guided_consensus_aligned_with_dups: output: config["data_dir"]+'/'+config["subdirs"]["align_ref"]+'/{sample}.realigned.only_aligned.bam' resources: - mem = 8, + mem_mb = 8*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -85,7 +85,7 @@ rule ref_guided_diversity: os.path.join(config["data_dir"], config["subdirs"]["interhost"], 'ref_guided.fasta'), os.path.join(config["data_dir"], config["subdirs"]["interhost"], 'ref_guided.vcf.gz') resources: - mem=8 + mem_mb=8*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=04:00:00'), @@ -95,7 +95,7 @@ rule ref_guided_diversity: shell("cat " + " ".join(input.in_fastas) + " > {output[0]}") merge_vcfs(input.in_vcfs, os.path.splitext(input.ref_genome_files[0])[0]+".fasta", output[1]) -def multi_align_mafft_memory(_, input): +def multi_align_mafft_memory_mb(_, input): sample_count = len(list(read_samples_file(config["samples_assembly"]))) if sample_count in range(0,50): mem=8 @@ -103,7 +103,7 @@ def multi_align_mafft_memory(_, input): mem=16 elif sample_count >= 100: mem=32 - return mem + return mem*1000 rule multi_align_mafft: input: @@ -119,7 +119,7 @@ rule multi_align_mafft: subdir=config["subdirs"]["multialign_ref"], chrom=range(1, len(config["accessions_for_ref_genome_build"])+1)) resources: - mem = multi_align_mafft_memory, + mem_mb = multi_align_mafft_memory_mb, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), diff --git a/pipes/rules/intrahost.rules b/pipes/rules/intrahost.rules index b2113edd9..ab2fe84d6 100644 --- a/pipes/rules/intrahost.rules +++ b/pipes/rules/intrahost.rules @@ -19,7 +19,7 @@ rule isnvs_per_sample: output: config["data_dir"]+'/'+config["subdirs"]["intrahost"] +'/vphaser2.{sample}.txt.gz' resources: - mem = 7, + mem_mb = 7*1000, threads = int(config.get("number_of_threads", 1)) params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), @@ -50,7 +50,7 @@ rule isnvs_vcf: annotation_text = config["data_dir"]+'/'+config["subdirs"]["intrahost"]+'/isnvs.annot.txt.gz', annotated_vcf_index = config["data_dir"]+'/'+config["subdirs"]["intrahost"]+'/isnvs.annot.vcf.gz.tbi' resources: - mem=4 + mem_mb=4*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=04:00:00'), @@ -86,7 +86,7 @@ rule isnvs_vcf_filtered: annotation_text = config["data_dir"]+'/'+config["subdirs"]["intrahost"] +'/isnvs.filtered.annot.txt.gz', annotated_vcf_index = config["data_dir"]+'/'+config["subdirs"]["intrahost"]+'/isnvs.annot.vcf.gz.tbi' resources: - mem=4 + mem_mb=4*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=04:00:00'), diff --git a/pipes/rules/metagenomics.rules b/pipes/rules/metagenomics.rules index 09a37b378..3baf5a3e1 100644 --- a/pipes/rules/metagenomics.rules +++ b/pipes/rules/metagenomics.rules @@ -61,7 +61,7 @@ rule diamond: reads = os.path.join(config["data_dir"], config["subdirs"]["metagenomics"], "{sample}.{adjective,raw|cleaned}.diamond.lca.gz") resources: threads = int(config.get("number_of_threads", 1)), - mem = 120 + mem_mb = 120*1000 params: UGER = config.get('UGER_queues', {}).get('long', '-l h_rt=36:00:00') run: @@ -82,13 +82,13 @@ rule align_rna: nodupes_lca = os.path.join(config["data_dir"], config["subdirs"]["metagenomics"], "{sample}.{adjective,raw|cleaned}.rna_bwa.lca_nodupes.gz") resources: threads = int(config.get("number_of_threads", 1)), - mem = 7 + mem_mb = 7*1000 params: UGER = config.get('UGER_queues', {}).get('long', '-l h_rt=36:00:00') run: rna_bwa_path_prefix = strip_protocol(config["align_rna_db"], relative=True) taxonomy_db_prefix = strip_protocol(config["taxonomy_db"], relative=True) - shell("{config[bin_dir]}/metagenomics.py align_rna {input.bam} "+rna_bwa_path_prefix+" "+taxonomy_db_prefix+" {output.nodupes_report} --dupeReport {output.report} --outBam {output.bam} --outReads {output.nodupes_lca} --dupeReads {output.lca} --JVMmemory {resources.mem}g --threads {resources.threads}") + shell("{config[bin_dir]}/metagenomics.py align_rna {input.bam} "+rna_bwa_path_prefix+" "+taxonomy_db_prefix+" {output.nodupes_report} --dupeReport {output.report} --outBam {output.bam} --outReads {output.nodupes_lca} --dupeReads {output.lca} --JVMmemory {resources.mem_mb}m --threads {resources.threads}") if config['kraken_execution'] == 'multiple': kraken_samples = [] @@ -135,7 +135,7 @@ if config['kraken_execution'] == 'multiple': reads = all_kraken_reads['raw'] resources: threads = int(config.get("number_of_threads", 1)), - mem = 120 + mem_mb = 120*1000 params: UGER = config.get('UGER_queues', {}).get('long', '-l h_rt=36:00:00'), kraken_db = config['kraken_db'], @@ -151,7 +151,7 @@ if config['kraken_execution'] == 'multiple': reads = all_kraken_reads['cleaned'] resources: threads = int(config.get("number_of_threads", 1)), - mem = 120 + mem_mb = 120*1000 params: UGER = config.get('UGER_queues', {}).get('long', '-l h_rt=36:00:00'), kraken_db = config['kraken_db'], @@ -168,7 +168,7 @@ else: reads = os.path.join(config["data_dir"], config["subdirs"]["metagenomics"], "{sample}.{adjective,raw|cleaned}.kraken.reads.gz") resources: threads = int(config.get("number_of_threads", 1)), - mem = 120 + mem_mb = 120*1000 params: UGER = config.get('UGER_queues', {}).get('long', '-l h_rt=36:00:00'), kraken_db = config['kraken_db'], @@ -203,7 +203,7 @@ rule krona_import_taxonomy: output: os.path.join(config["data_dir"], config["subdirs"]["metagenomics"], "{sample}.{adjective,raw|cleaned}.{method,kraken|diamond|rna_bwa|rna_bwa_nodupes}.krona.html") resources: - mem=32 + mem_mb=32*1000 run: krona_db_prefix = strip_protocol(config["krona_db"], relative=True) shell("{config[bin_dir]}/metagenomics.py krona {input.tsv} "+krona_db_prefix+" {output} --noRank") diff --git a/pipes/rules/ncbi.rules b/pipes/rules/ncbi.rules index e5a500051..8f87f4d54 100644 --- a/pipes/rules/ncbi.rules +++ b/pipes/rules/ncbi.rules @@ -74,7 +74,7 @@ rule annot_transfer: samp=read_samples_file(config["samples_assembly"]), chrom=range(1, len(config["accessions_for_ref_genome_build"])+1)) resources: - mem = 4 + mem_mb = 4*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=04:00:00') diff --git a/pipes/rules/reports.rules b/pipes/rules/reports.rules index 0e0991445..24bc7fc84 100644 --- a/pipes/rules/reports.rules +++ b/pipes/rules/reports.rules @@ -48,7 +48,7 @@ rule fastqc_report: output: config["reports_dir"]+'/fastqc/{sample}/{adjective}' resources: - mem = 3 + mem_mb = 3*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=04:00:00'), @@ -99,7 +99,7 @@ if config.get("spikeins_db"): output: config["reports_dir"]+'/spike_count/{sample}.spike_count.txt' resources: - mem = 3 + mem_mb = 3*1000 params: LSF = config.get('LSF_queues', {}).get('short', '-W 4:00'), UGER = config.get('UGER_queues', {}).get('short', '-l h_rt=04:00:00'), diff --git a/pipes/run-pipe_local.sh b/pipes/run-pipe_local.sh new file mode 100644 index 000000000..6e61aab6a --- /dev/null +++ b/pipes/run-pipe_local.sh @@ -0,0 +1,50 @@ +#!/bin/bash +# Wrappers around Snakemake for execution on a single instance + +# determine the directory of this script +SCRIPT_DIRECTORY=$(dirname $(readlink --canonicalize-existing $0)) + +# if a conda environment is active, deactivate it +# if [[ ! -z "${CONDA_PREFIX}" ]]; then +# echo "deactivating env: $CONDA_PREFIX" +# source deactivate +# fi + +python_check=$(hash python &> /dev/null || hash python3 &> /dev/null) +if [ $? -ne 0 ]; then + echo "It looks like Python is not installed. Exiting." + if [[ $sourced -eq 0 ]]; then + exit 1 + else + return 1 + fi +fi + +python3_check=$(hash python3 &> /dev/null) +if [ $? -eq 0 ]; then + python_to_use="$(which python3)" +fi + +$python_to_use --version + +# load config dirs from config.yaml. After using the conda dotkit, we should have PyYAML +CONDAENVDIR=`$python_to_use -c "import yaml, os;f=open(\"config.yaml\");print(os.path.realpath(yaml.safe_load(f)['conda_env_dir']));f.close()"` +MINICONDADIR=`$python_to_use -c 'import yaml; import os; f=open("config.yaml");print(os.path.realpath(yaml.safe_load(f)["miniconda_dir"]));f.close()'` +BINDIR=`$python_to_use -c "import yaml, os;f=open(\"config.yaml\");print(os.path.realpath(yaml.safe_load(f)['bin_dir']));f.close()"` +DATADIR=`$python_to_use -c "import yaml, os; f=open(\"config.yaml\");print(os.path.realpath(yaml.safe_load(f)['data_dir']));f.close()"` +LOGDIR=`$python_to_use -c "import yaml, os; f=open(\"config.yaml\");print(os.path.realpath(yaml.safe_load(f)['log_dir']));f.close()"` + +#export PATH="$MINICONDADIR/bin:$PATH" + +# load conda environment +#source activate "$CONDAENVDIR" + +ARGS="" +# invoke Snakemake in cluster mode with custom wrapper scripts +snakemake --rerun-incomplete --keep-going --nolock \ + $ARGS \ + --latency-wait 20 \ + --directory . \ + --resources mem_mb=$(expr $(cat /proc/meminfo | grep MemTotal | awk '{print $2}') / 1000) \ + --cores $(expr $(grep -c ^processor /proc/cpuinfo) - 1) \ + "$@" | tee "$LOGDIR/snakemake_$(date +%F_%s).log" \ No newline at end of file