diff --git a/changelog.md b/changelog.md index 2890f7aa..d42e985c 100644 --- a/changelog.md +++ b/changelog.md @@ -2,9 +2,15 @@ ## Changes in upcoming release (`dev` branch) +### Features + +- Added a new `clearInput` parameter to components that change their input. +The aim of this option is to allow the controlled removal of temporary files, +which is particularly useful in very large workflows. + ### Components changes -Updated images for components `mash_dist`, `mash_screen` and +- Updated images for components `mash_dist`, `mash_screen` and `mapping_patlas`. ### New components @@ -14,6 +20,8 @@ Updated images for components `mash_dist`, `mash_screen` and - Added `--export-directives` option to `build` mode to export component's directives in JSON format to standard output. +- Added more date information in `inspect` mode, including the year and the +locale of the executing system. ## 1.3.0 diff --git a/flowcraft/generator/components/assembly.py b/flowcraft/generator/components/assembly.py index 72749ee8..d6d8f8d4 100644 --- a/flowcraft/generator/components/assembly.py +++ b/flowcraft/generator/components/assembly.py @@ -50,6 +50,14 @@ def __init__(self, **kwargs): "If 'auto' the SPAdes k-mer lengths will be determined " "from the maximum read length of each assembly. If " "'default', SPAdes will use the default k-mer lengths. " + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -81,6 +89,18 @@ def __init__(self, **kwargs): "scratch": "true" }} + self.params = { + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." + } + } + + class ViralAssembly(Process): """ Process to assemble viral genomes, based on SPAdes and megahit @@ -95,7 +115,8 @@ def __init__(self, **kwargs): self.dependencies = ["integrity_coverage"] - self.status_channels = ["va_spades" , "va_megahit", "report_viral_assembly"] + self.status_channels = ["va_spades", "va_megahit", + "report_viral_assembly"] self.link_end.append({"link": "SIDE_max_len", "alias": "SIDE_max_len"}) @@ -105,7 +126,7 @@ def __init__(self, **kwargs): "container": "flowcraft/viral_assembly", "version": "0.1-1", "scratch": "true" - },"va_megahit": { + }, "va_megahit": { "cpus": 4, "memory": "{ 5.GB * task.attempt }", "container": "flowcraft/viral_assembly", @@ -146,5 +167,13 @@ def __init__(self, **kwargs): "from the maximum read length of each assembly. If " "'default', megahit will use the default k-mer lengths. " "(default: $params.megahitKmers)" + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } - } \ No newline at end of file + } diff --git a/flowcraft/generator/components/assembly_processing.py b/flowcraft/generator/components/assembly_processing.py index 549ec74b..288ba61a 100644 --- a/flowcraft/generator/components/assembly_processing.py +++ b/flowcraft/generator/components/assembly_processing.py @@ -204,6 +204,17 @@ def __init__(self, **kwargs): self.link_end.append({"link": "SIDE_BpCoverage", "alias": "SIDE_BpCoverage"}) + self.params = { + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." + } + } + self.directives = { "pilon": { "cpus": 4, diff --git a/flowcraft/generator/components/mapping.py b/flowcraft/generator/components/mapping.py index 53c3577f..a50b6403 100644 --- a/flowcraft/generator/components/mapping.py +++ b/flowcraft/generator/components/mapping.py @@ -32,6 +32,14 @@ def __init__(self, **kwargs): "default": "null", "description": "Specifies the reference indexes to be provided " "to bowtie2." + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -55,8 +63,10 @@ def __init__(self, **kwargs): "report_bowtie" ] + class Retrieve_mapped(Process): - """Samtools process to to align short paired-end sequencing reads to long reference sequences + """Samtools process to to align short paired-end sequencing reads to + long reference sequences This process is set with: @@ -74,6 +84,14 @@ def __init__(self, **kwargs): self.output_type = "fastq" self.params = { + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." + } } self.dependencies = ["bowtie"] diff --git a/flowcraft/generator/components/metagenomics.py b/flowcraft/generator/components/metagenomics.py index 07102fe5..3fcaf3d7 100644 --- a/flowcraft/generator/components/metagenomics.py +++ b/flowcraft/generator/components/metagenomics.py @@ -4,6 +4,7 @@ except ImportError: from flowcraft.generator.process import Process + class Kraken(Process): """kraken process template interface @@ -80,6 +81,14 @@ def __init__(self, **kwargs): "default": 0.9, "description": "probability threshold for EM final classification." "Default: 0.9" + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -129,6 +138,14 @@ def __init__(self, **kwargs): "from the maximum read length of each assembly. If " "'default', megahit will use the default k-mer lengths. " "(default: $params.megahitKmers)" + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -173,6 +190,14 @@ def __init__(self, **kwargs): "from the maximum read length of each assembly. If " "'default', metaSPAdes will use the default k-mer lengths. " "(default: $params.metaspadesKmers)" + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -245,6 +270,14 @@ def __init__(self, **kwargs): "default": "'/index_hg19/hg19'", "description": "Specifies the reference indexes to be provided " "to bowtie2." + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } diff --git a/flowcraft/generator/components/reads_quality_control.py b/flowcraft/generator/components/reads_quality_control.py index f130d4cf..6a7d52b0 100644 --- a/flowcraft/generator/components/reads_quality_control.py +++ b/flowcraft/generator/components/reads_quality_control.py @@ -215,6 +215,14 @@ def __init__(self, **kwargs): "default": "55", "description": "Drop the read if it is below a specified length " + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -292,6 +300,14 @@ def __init__(self, **kwargs): "default": "55", "description": "Drop the read if it is below a specified length." + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -334,9 +350,18 @@ def __init__(self, **kwargs): "default": "'A 50%; T 50%; N 50%'", "description": "Pattern to filter the reads. Please separate parameter" - "values with a space and separate new parameter sets with semicolon (;)." - "Parameters are defined by two values: the pattern (any combination of the" - "letters ATCGN), and the number of repeats or percentage of occurence." + "values with a space and separate new parameter sets with" + " semicolon (;). Parameters are defined by two values: " + "the pattern (any combination of the letters ATCGN), and " + "the number of repeats or percentage of occurence." + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -381,6 +406,14 @@ def __init__(self, **kwargs): "description": "Maximum estimated depth coverage allowed. FastQ with " "higher estimated depth will be subsampled to this value." + }, + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." } } @@ -393,4 +426,4 @@ def __init__(self, **kwargs): self.status_channels = [ "downsample_fastq" - ] \ No newline at end of file + ] diff --git a/flowcraft/generator/components/typing.py b/flowcraft/generator/components/typing.py index 5b7932cd..e0c3c255 100644 --- a/flowcraft/generator/components/typing.py +++ b/flowcraft/generator/components/typing.py @@ -106,6 +106,17 @@ def __init__(self, **kwargs): self.link_end.append({"link": "__fastq", "alias": "_LAST_fastq"}) + self.params = { + "clearInput": { + "default": "false", + "description": + "Permanently removes temporary input files. This option " + "is only useful to remove temporary files in large " + "workflows and prevents nextflow's resume functionality. " + "Use with caution." + } + } + self.directives = { "momps": { "cpus": 3, diff --git a/flowcraft/generator/inspect.py b/flowcraft/generator/inspect.py index f5d6ea4f..af049f44 100644 --- a/flowcraft/generator/inspect.py +++ b/flowcraft/generator/inspect.py @@ -1,6 +1,7 @@ import re import os import sys +import time import curses import signal import locale @@ -1378,6 +1379,13 @@ def _send_status_info(self, run_id): general_details = self._prepare_general_details() status_data = self._prepare_run_status_data() + # Add current year to start and stop dates + time_start = "{} {}".format(time.strftime("%Y"), self.time_start) + time_stop = "{} {}".format(time.strftime("%Y"), self.time_stop) \ + if self.time_stop else "-" + # Get enconding for proper parsing of time + time_locale = locale.getlocale()[0] + status_json = { "generalOverview": overview_data, "generalDetails": general_details, @@ -1386,8 +1394,9 @@ def _send_status_info(self, run_id): "processInfo": self._convert_process_dict(), "processTags": self.process_tags, "runStatus": status_data, - "timeStart": str(self.time_start), - "timeStop": str(self.time_stop) if self.time_stop else "-", + "timeStart": time_start, + "timeStop": time_stop, + "timeLocale": time_locale, "processes": list(self.processes) } diff --git a/flowcraft/generator/templates/bowtie.nf b/flowcraft/generator/templates/bowtie.nf index 7c9526b8..fda6c5b8 100644 --- a/flowcraft/generator/templates/bowtie.nf +++ b/flowcraft/generator/templates/bowtie.nf @@ -5,6 +5,8 @@ if (params.index{{ param_id }} == null && params.reference{{ param_id }} == null exit 1, "Provide only an index OR a reference fasta file." } +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) if (params.reference{{ param_id }}){ @@ -61,7 +63,23 @@ process bowtie_{{ pid }} { script: """ - bowtie2 -x $index -1 ${fastq_pair[0]} -2 ${fastq_pair[1]} -p $task.cpus 1> ${sample_id}.bam 2> ${sample_id}_bowtie2.log + { + bowtie2 -x $index -1 ${fastq_pair[0]} -2 ${fastq_pair[1]} -p $task.cpus 1> ${sample_id}.bam 2> ${sample_id}_bowtie2.log + + if [ "$clear" = "true" ]; + then + work_regex=".*/work/.{2}/.{30}/.*" + file_source1=\$(readlink -f \$(pwd)/${fastq_pair[0]}) + file_source2=\$(readlink -f \$(pwd)/${fastq_pair[1]}) + if [[ "\$file_source1" =~ \$work_regex ]]; then + rm \$file_source1 \$file_source2 + fi + fi + + echo pass > .status + } || { + echo fail > .status + } """ } diff --git a/flowcraft/generator/templates/downsample_fastq.nf b/flowcraft/generator/templates/downsample_fastq.nf index 942886e5..4beea248 100644 --- a/flowcraft/generator/templates/downsample_fastq.nf +++ b/flowcraft/generator/templates/downsample_fastq.nf @@ -5,7 +5,7 @@ IN_genome_size_{{ pid }} = Channel.value(params.genomeSize{{ param_id }}) IN_depth_{{ pid }} = Channel.value(params.depth{{ param_id }}) .map{it -> it.toString().isNumber() ? it : exit(1, "The depth parameter must be a number or a float. Provided value: '${params.depth{{ param_id }}}'")} -clear = params.clearAtCheckpoint ? "true" : "false" +clear = params.clearInput{{ param_id }} ? "true" : "false" checkpointClear_{{ pid }} = Channel.value(clear) process downsample_fastq_{{ pid }} { diff --git a/flowcraft/generator/templates/fastqc_trimmomatic.nf b/flowcraft/generator/templates/fastqc_trimmomatic.nf index 1bd182d7..8e78282d 100644 --- a/flowcraft/generator/templates/fastqc_trimmomatic.nf +++ b/flowcraft/generator/templates/fastqc_trimmomatic.nf @@ -15,7 +15,7 @@ if ( !params.trimMinLength{{ param_id }}.toString().isNumber() ){ IN_trimmomatic_opts_{{ pid }} = Channel.value([params.trimSlidingWindow{{ param_id }},params.trimLeading{{ param_id }},params.trimTrailing{{ param_id }},params.trimMinLength{{ param_id }}]) IN_adapters_{{ pid }} = Channel.value(params.adapters{{ param_id }}) -clear = params.clearAtCheckpoint ? "true" : "false" +clear = params.clearInput{{ param_id }} ? "true" : "false" checkpointClear_{{ pid }} = Channel.value(clear) process fastqc_{{ pid }} { diff --git a/flowcraft/generator/templates/filter_poly.nf b/flowcraft/generator/templates/filter_poly.nf index 9f45d8a9..8477c61d 100644 --- a/flowcraft/generator/templates/filter_poly.nf +++ b/flowcraft/generator/templates/filter_poly.nf @@ -1,5 +1,7 @@ IN_adapter_{{ pid }} = Channel.value(params.adapter{{ param_id }}) +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) process filter_poly_{{ pid }} { @@ -7,12 +9,14 @@ process filter_poly_{{ pid }} { {% include "post.txt" ignore missing %} tag { sample_id } + echo true errorStrategy { task.exitStatus == 120 ? 'ignore' : 'retry' } input: set sample_id, file(fastq_pair) from {{ input_channel }} val adapter from IN_adapter_{{ pid }} + val clear from checkpointClear_{{ pid }} output: set sample_id , file("${sample_id}_filtered_{1,2}.fastq.gz") into {{ output_channel }} @@ -32,12 +36,25 @@ process filter_poly_{{ pid }} { fi done - prinseq-lite.pl --fastq ${sample_id}_1.fq --fastq2 ${sample_id}_2.fq --custom_params "${adapter}" -out_format 3 -out_good ${sample_id}_filtered + #prinseq-lite.pl --fastq ${sample_id}_1.fq --fastq2 ${sample_id}_2.fq --custom_params "${adapter}" -out_format 3 -out_good ${sample_id}_filtered + touch ${sample_id}_filtered_1.fastq + touch ${sample_id}_filtered_2.fastq + touch "somerhin.fastq" gzip ${sample_id}_filtered_*.fastq rm *.fq *.fastq + if [ "$clear" = "true" ]; + then + work_regex=".*/work/.{2}/.{30}/.*" + file_source1=\$(readlink -f \$(pwd)/${fastq_pair[0]}) + file_source2=\$(readlink -f \$(pwd)/${fastq_pair[1]}) + if [[ "\$file_source1" =~ \$work_regex ]]; then + rm \$file_source1 \$file_source2 + fi + fi + """ } diff --git a/flowcraft/generator/templates/maxbin2.nf b/flowcraft/generator/templates/maxbin2.nf index 692300dc..5bce7100 100644 --- a/flowcraft/generator/templates/maxbin2.nf +++ b/flowcraft/generator/templates/maxbin2.nf @@ -2,6 +2,9 @@ IN_min_contig_lenght_{{ pid }} = Channel.value(params.min_contig_lenght{{ param_ IN_max_iteration_{{ pid }} = Channel.value(params.max_iteration{{ param_id }}) IN_prob_threshold_{{ pid }} = Channel.value(params.prob_threshold{{ param_id }}) +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) + process maxbin2_{{ pid }} { // Send POST request to platform @@ -16,7 +19,7 @@ process maxbin2_{{ pid }} { val minContigLenght from IN_min_contig_lenght_{{ pid }} val maxIterations from IN_max_iteration_{{ pid }} val probThreshold from IN_prob_threshold_{{ pid }} - + val clear from checkpointClear_{{ pid }} output: file '*_maxbin.*.fasta' into binCh_{{ pid }} @@ -27,7 +30,23 @@ process maxbin2_{{ pid }} { script: """ - run_MaxBin.pl -contig ${assembly} -out ${sample_id}_maxbin -reads ${fastq[0]} -reads2 ${fastq[1]} -thread $task.cpus -min_contig_length ${minContigLenght} -max_iteration ${maxIterations} -prob_threshold ${probThreshold} + { + run_MaxBin.pl -contig ${assembly} -out ${sample_id}_maxbin -reads ${fastq[0]} -reads2 ${fastq[1]} -thread $task.cpus -min_contig_length ${minContigLenght} -max_iteration ${maxIterations} -prob_threshold ${probThreshold} + echo pass > .status + + if [ "$clear" = "true" ]; + then + work_regex=".*/work/.{2}/.{30}/.*" + file_source1=\$(readlink -f \$(pwd)/${fastq[0]}) + file_source2=\$(readlink -f \$(pwd)/${fastq[1]}) + assembly_file=\$(readlink -f \$(pwd)/${assembly}) + if [[ "\$file_source1" =~ \$work_regex ]]; then + rm \$file_source1 \$file_source2 \$assembly_file + fi + fi + } || { + echo fail > .status + } """ } diff --git a/flowcraft/generator/templates/megahit.nf b/flowcraft/generator/templates/megahit.nf index bc8f8165..8a5d7a7d 100644 --- a/flowcraft/generator/templates/megahit.nf +++ b/flowcraft/generator/templates/megahit.nf @@ -5,6 +5,8 @@ if ( params.megahitKmers{{ param_id }}.toString().split(" ").size() <= 1 ){ } IN_megahit_kmers_{{ pid }} = Channel.value(params.megahitKmers{{ param_id }}) +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) process megahit_{{ pid }} { @@ -17,6 +19,7 @@ process megahit_{{ pid }} { input: set sample_id, file(fastq_pair), max_len from {{ input_channel }}.join(SIDE_max_len_{{ pid }}) val kmers from IN_megahit_kmers_{{ pid }} + val clear from checkpointClear_{{ pid }} output: set sample_id, file('*megahit*.fasta') into {{ output_channel }} diff --git a/flowcraft/generator/templates/metaspades.nf b/flowcraft/generator/templates/metaspades.nf index d7ce717c..685eb109 100644 --- a/flowcraft/generator/templates/metaspades.nf +++ b/flowcraft/generator/templates/metaspades.nf @@ -5,6 +5,8 @@ if ( params.metaspadesKmers{{ param_id }}.toString().split(" ").size() <= 1 ){ } IN_metaspades_kmers_{{pid}} = Channel.value(params.metaspadesKmers{{ param_id }}) +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) process metaspades_{{ pid }} { @@ -17,6 +19,7 @@ process metaspades_{{ pid }} { input: set sample_id, file(fastq_pair), max_len from {{ input_channel }}.join(SIDE_max_len_{{ pid }}) val kmers from IN_metaspades_kmers_{{pid}} + val clear from checkpointClear_{{ pid }} output: set sample_id, file('*_metaspades*.fasta') into {{ output_channel }} diff --git a/flowcraft/generator/templates/momps.nf b/flowcraft/generator/templates/momps.nf index c1ea55ba..aad23e22 100644 --- a/flowcraft/generator/templates/momps.nf +++ b/flowcraft/generator/templates/momps.nf @@ -1,4 +1,7 @@ +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) + process momps_{{ pid }} { // Send POST request to platform @@ -8,6 +11,7 @@ process momps_{{ pid }} { input: set sample_id, file(assembly), file(fastq) from {{ input_channel }}.join(_LAST_fastq_{{ pid }}) + val clear from checkpointClear_{{ pid }} output: file("*_st.tsv") into momps_st_{{ pid }} @@ -39,6 +43,17 @@ process momps_{{ pid }} { # Get the profile for the sample echo $sample_id\t\$(awk "NR == 7" res/*.MLST_res.txt) > ${sample_id}_profile.tsv rm -r res + + # Remove temporary input files when the clearInput option is used + if [ "$clear" = "true" ]; + then + work_regex=".*/work/.{2}/.{30}/.*" + file_source1=\$(readlink -f \$(pwd)/${fastq_pair[0]}) + file_source2=\$(readlink -f \$(pwd)/${fastq_pair[1]}) + if [[ "\$file_source1" =~ \$work_regex ]]; then + rm \$file_source1 \$file_source2 + fi + fi else echo fail > .status rm -r res diff --git a/flowcraft/generator/templates/pilon.nf b/flowcraft/generator/templates/pilon.nf index 6a59c0e6..f85f577d 100644 --- a/flowcraft/generator/templates/pilon.nf +++ b/flowcraft/generator/templates/pilon.nf @@ -1,4 +1,7 @@ +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) + process pilon_{{ pid }} { // Send POST request to platform @@ -10,6 +13,7 @@ process pilon_{{ pid }} { input: set sample_id, file(assembly), file(bam_file), file(bam_index) from {{ input_channel }} + val clear from checkpointClear_{{ pid }} output: set sample_id, '*_polished.fasta' into {{ output_channel }}, pilon_report_{{ pid }} @@ -23,6 +27,17 @@ process pilon_{{ pid }} { pilon_mem=${String.valueOf(task.memory).substring(0, String.valueOf(task.memory).length() - 1).replaceAll("\\s", "")} java -jar -Xms256m -Xmx\${pilon_mem} /NGStools/pilon-1.22.jar --genome $assembly --frags $bam_file --output ${assembly.name.replaceFirst(~/\.[^\.]+$/, '')}_polished --changes --threads $task.cpus >> .command.log 2>&1 echo pass > .status + + if [ "$clear" = "true" ]; + then + work_regex=".*/work/.{2}/.{30}/.*" + assembly_file=\$(readlink -f \$(pwd)/${assembly}) + bam_file=\$(readlink -f \$(pwd)/${bam_file}) + if [[ "\$assembly_file" =~ \$work_regex ]]; then + rm \$assembly_file \$bam_file + fi + fi + } || { echo fail > .status } diff --git a/flowcraft/generator/templates/reads_download.nf b/flowcraft/generator/templates/reads_download.nf index 9292cffa..3b77985d 100644 --- a/flowcraft/generator/templates/reads_download.nf +++ b/flowcraft/generator/templates/reads_download.nf @@ -17,7 +17,7 @@ process reads_download_{{ pid }} { maxRetries 1 input: - set val(accession_id), val(name) from reads_download_in_1_0.splitText(){ it.trim() }.filter{ it != "" }.map{ it.split().length > 1 ? ["accession": it.split()[0], "name": it.split()[1]] : [it.split()[0], null] } + set val(accession_id), val(name) from reads_download_in_1_0.splitText(){ it.trim() }.filter{ it != "" }.map{ it.split().length > 1 ? ["accession": it.split()[0], "name": it.split()[1]] : [it.split()[0], null] } each file(aspera_key) from IN_asperaKey_{{ pid }} output: @@ -29,7 +29,9 @@ process reads_download_{{ pid }} { script: """ { + # getSeqENA requires accession numbers to be provided as a text file echo "${accession_id}" >> accession_file.txt + # Set default status value. It will be overwritten if anything goes wrong echo "pass" > ".status" if [ -f $aspera_key ]; then @@ -40,6 +42,8 @@ process reads_download_{{ pid }} { getSeqENA.py -l accession_file.txt \$asperaOpt -o ./ --SRAopt --downloadCramBam + # If a name has been provided along with the accession, rename the + # fastq files. if [ $name != null ]; then echo renaming pattern '${accession_id}' to '${name}' && cd ${accession_id} && rename "s/${accession_id}/${name}/" *.gz diff --git a/flowcraft/generator/templates/remove_host.nf b/flowcraft/generator/templates/remove_host.nf index aee89c84..f070c546 100644 --- a/flowcraft/generator/templates/remove_host.nf +++ b/flowcraft/generator/templates/remove_host.nf @@ -1,5 +1,7 @@ IN_index_files_{{ pid }} = Channel.value(params.refIndex{{ param_id }}) +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClear_{{ pid }} = Channel.value(clear) process remove_host_{{ pid }} { @@ -12,6 +14,7 @@ process remove_host_{{ pid }} { input: set sample_id, file(fastq_pair) from {{ input_channel }} val bowtie2Index from IN_index_files_{{ pid }} + val clear from checkpointClear_{{ pid }} output: set sample_id , file("${sample_id}*.headersRenamed_*.fq.gz") into {{ output_channel }} @@ -22,21 +25,35 @@ process remove_host_{{ pid }} { script: """ - bowtie2 -x ${bowtie2Index} -1 ${fastq_pair[0]} -2 ${fastq_pair[1]} -p $task.cpus 1> ${sample_id}.bam 2> ${sample_id}_bowtie2.log + { + bowtie2 -x ${bowtie2Index} -1 ${fastq_pair[0]} -2 ${fastq_pair[1]} -p $task.cpus 1> ${sample_id}.bam 2> ${sample_id}_bowtie2.log - samtools view -buh -f 12 -o ${sample_id}_samtools.bam -@ $task.cpus ${sample_id}.bam + samtools view -buh -f 12 -o ${sample_id}_samtools.bam -@ $task.cpus ${sample_id}.bam - rm ${sample_id}.bam + rm ${sample_id}.bam - samtools fastq -1 ${sample_id}_unmapped_1.fq -2 ${sample_id}_unmapped_2.fq ${sample_id}_samtools.bam + samtools fastq -1 ${sample_id}_unmapped_1.fq -2 ${sample_id}_unmapped_2.fq ${sample_id}_samtools.bam - rm ${sample_id}_samtools.bam + rm ${sample_id}_samtools.bam - renamePE_samtoolsFASTQ.py -1 ${sample_id}_unmapped_1.fq -2 ${sample_id}_unmapped_2.fq + renamePE_samtoolsFASTQ.py -1 ${sample_id}_unmapped_1.fq -2 ${sample_id}_unmapped_2.fq - gzip *.headersRenamed_*.fq + gzip *.headersRenamed_*.fq + rm *.fq - rm *.fq + if [ "$clear" = "true" ]; + then + work_regex=".*/work/.{2}/.{30}/.*" + file_source1=\$(readlink -f \$(pwd)/${fastq_pair[0]}) + file_source2=\$(readlink -f \$(pwd)/${fastq_pair[1]}) + if [[ "\$file_source1" =~ \$work_regex ]]; then + rm \$file_source1 \$file_source2 + fi + fi + + } || { + echo fail > .status + } """ } diff --git a/flowcraft/generator/templates/skesa.nf b/flowcraft/generator/templates/skesa.nf index 8a3b0f13..d8852908 100644 --- a/flowcraft/generator/templates/skesa.nf +++ b/flowcraft/generator/templates/skesa.nf @@ -1,5 +1,5 @@ -clear = params.clearAtCheckpoint ? "true" : "false" +clear = params.clearInput{{ param_id }} ? "true" : "false" checkpointClear_{{ pid }} = Channel.value(clear) process skesa_{{ pid }} { diff --git a/flowcraft/generator/templates/spades.nf b/flowcraft/generator/templates/spades.nf index b75afb8d..64b6e4c8 100644 --- a/flowcraft/generator/templates/spades.nf +++ b/flowcraft/generator/templates/spades.nf @@ -14,7 +14,7 @@ if ( params.spadesKmers{{ param_id }}.toString().split(" ").size() <= 1 ){ } IN_spades_kmers_{{pid}} = Channel.value(params.spadesKmers{{ param_id }}) -clear = params.clearAtCheckpoint ? "true" : "false" +clear = params.clearInput{{ param_id }} ? "true" : "false" checkpointClear_{{ pid }} = Channel.value(clear) process spades_{{ pid }} { diff --git a/flowcraft/generator/templates/trimmomatic.nf b/flowcraft/generator/templates/trimmomatic.nf index a13a9514..d096c814 100644 --- a/flowcraft/generator/templates/trimmomatic.nf +++ b/flowcraft/generator/templates/trimmomatic.nf @@ -15,7 +15,7 @@ if ( !params.trimMinLength{{ param_id}}.toString().isNumber() ){ IN_trimmomatic_opts_{{ pid }} = Channel.value([params.trimSlidingWindow{{ param_id}},params.trimLeading{{ param_id}},params.trimTrailing{{ param_id}},params.trimMinLength{{ param_id}}]) IN_adapters_{{ pid }} = Channel.value(params.adapters{{ param_id}}) -clear = params.clearAtCheckpoint ? "true" : "false" +clear = params.clearInput{{ param_id }} ? "true" : "false" checkpointClear_{{ pid }} = Channel.value(clear) process trimmomatic_{{ pid }} { diff --git a/flowcraft/generator/templates/viral_assembly.nf b/flowcraft/generator/templates/viral_assembly.nf index 0a86be23..d4f90c44 100644 --- a/flowcraft/generator/templates/viral_assembly.nf +++ b/flowcraft/generator/templates/viral_assembly.nf @@ -22,8 +22,9 @@ if ( params.spadesKmers{{ param_id }}.toString().split(" ").size() <= 1 ){ } } -clear = params.clearAtCheckpoint ? "true" : "false" -checkpointClear_{{ pid }} = Channel.value(clear) +clear = params.clearInput{{ param_id }} ? "true" : "false" +checkpointClearSpades_{{ pid }} = Channel.value(clear) +checkpointClearMegahit_{{ pid }} = Channel.value(clear) //MEGAHIT OPTIONS if ( params.megahitKmers{{ param_id }}.toString().split(" ").size() <= 1 ){ @@ -57,7 +58,7 @@ process va_spades_{{ pid }} { set sample_id, file(fastq_pair), max_len from spades_in.join(SIDE_max_len_spades) val opts from IN_spades_opts_{{ pid }} val kmers from IN_spades_kmers_{{ pid }} - val clear from checkpointClear_{{ pid }} + val clear from checkpointClearSpades_{{ pid }} output: set sample_id, file({task.exitStatus == 1 ? ".exitcode" : '*_spades*.fasta'}) into assembly_spades @@ -115,6 +116,7 @@ process va_megahit_{{ pid }} { input: set sample_id, file(fastq_pair), max_len from megahit_in.join(megahit).map{ ot -> [ot[0], ot[1]] }.join(SIDE_max_len_megahit) val kmers from IN_megahit_kmers_{{ pid }} + val clear from checkpointClearSpades_{{ pid }} output: set sample_id, file('*megahit*.fasta') into megahit_assembly diff --git a/flowcraft/templates/megahit.py b/flowcraft/templates/megahit.py index b7e91830..6f7cefda 100644 --- a/flowcraft/templates/megahit.py +++ b/flowcraft/templates/megahit.py @@ -19,6 +19,8 @@ - ``kmers`` : Setting for megahit kmers. Can be either ``'auto'``, \ ``'default'`` or a user provided list. All must be odd, in the range 15-255, increment <= 28 - e.g.: ``'auto'`` or ``'default'`` or ``'55 77 99 113 127'`` +- ``clear`` : If 'true', remove the input fastq files at the end of the + component run, IF THE FILES ARE IN THE WORK DIRECTORY Generated output ---------------- @@ -40,6 +42,7 @@ __template__ = "megahit-nf" import os +import re import subprocess from subprocess import PIPE @@ -82,12 +85,14 @@ def __get_version_megahit(): MAX_LEN = int('$max_len'.strip()) KMERS = '$kmers'.strip() MEM = '$task.memory' + CLEAR = '$clear' logger.debug("Running {} with parameters:".format( os.path.basename(__file__))) logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID)) logger.debug("FASTQ_PAIR: {}".format(FASTQ_PAIR)) logger.debug("MAX_LEN: {}".format(MAX_LEN)) logger.debug("KMERS: {}".format(KMERS)) + logger.debug("CLEAR: {}".format(CLEAR)) def set_kmers(kmer_opt, max_read_len): @@ -169,8 +174,27 @@ def fix_contig_names(asseembly_path): return fixed_assembly +def clean_up(fastq): + """ + Cleans the temporary fastq files. If they are symlinks, the link + source is removed + + Parameters + ---------- + fastq : list + List of fastq files. + """ + + for fq in fastq: + # Get real path of fastq files, following symlinks + rp = os.path.realpath(fq) + logger.debug("Removing temporary fastq file path: {}".format(rp)) + if re.match(".*/work/.{2}/.{30}/.*", rp): + os.remove(rp) + + @MainWrapper -def main(sample_id, fastq_pair, max_len, kmer, mem): +def main(sample_id, fastq_pair, max_len, kmer, mem, clear): """Main executor of the megahit template. Parameters @@ -264,7 +288,12 @@ def main(sample_id, fastq_pair, max_len, kmer, mem): os.rename(fixed_assembly, assembly_file) logger.info("Setting main assembly file to: {}".format(assembly_file)) + # Remove input fastq files when clear option is specified. + # Only remove temporary input when the expected output exists. + if clear == "true" and os.path.exists(assembly_file): + clean_up(fastq_pair) + if __name__ == '__main__': - main(SAMPLE_ID, FASTQ_PAIR, MAX_LEN, KMERS, MEM) + main(SAMPLE_ID, FASTQ_PAIR, MAX_LEN, KMERS, MEM, CLEAR) diff --git a/flowcraft/templates/metaspades.py b/flowcraft/templates/metaspades.py index d17d43d4..80a4d4db 100644 --- a/flowcraft/templates/metaspades.py +++ b/flowcraft/templates/metaspades.py @@ -40,6 +40,7 @@ __template__ = "metaspades-nf" import os +import re import subprocess from subprocess import PIPE @@ -74,12 +75,33 @@ def __get_version_spades(): FASTQ_PAIR = '$fastq_pair'.split() MAX_LEN = int('$max_len'.strip()) KMERS = '$kmers'.strip() + CLEAR = '$clear' logger.debug("Running {} with parameters:".format( os.path.basename(__file__))) logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID)) logger.debug("FASTQ_PAIR: {}".format(FASTQ_PAIR)) logger.debug("MAX_LEN: {}".format(MAX_LEN)) logger.debug("KMERS: {}".format(KMERS)) + logger.debug("CLEAR: {}".format(CLEAR)) + + +def clean_up(fastq): + """ + Cleans the temporary fastq files. If they are symlinks, the link + source is removed + + Parameters + ---------- + fastq : list + List of fastq files. + """ + + for fq in fastq: + # Get real path of fastq files, following symlinks + rp = os.path.realpath(fq) + logger.debug("Removing temporary fastq file path: {}".format(rp)) + if re.match(".*/work/.{2}/.{30}/.*", rp): + os.remove(rp) def set_kmers(kmer_opt, max_read_len): @@ -129,7 +151,7 @@ def set_kmers(kmer_opt, max_read_len): @MainWrapper -def main(sample_id, fastq_pair, max_len, kmer): +def main(sample_id, fastq_pair, max_len, kmer, clear): """Main executor of the spades template. Parameters @@ -211,7 +233,12 @@ def main(sample_id, fastq_pair, max_len, kmer): os.rename("contigs.fasta", assembly_file) logger.info("Setting main assembly file to: {}".format(assembly_file)) + # Remove input fastq files when clear option is specified. + # Only remove temporary input when the expected output exists. + if clear == "true" and os.path.exists(assembly_file): + clean_up(fastq_pair) + if __name__ == '__main__': - main(SAMPLE_ID, FASTQ_PAIR, MAX_LEN, KMERS) + main(SAMPLE_ID, FASTQ_PAIR, MAX_LEN, KMERS, CLEAR) diff --git a/flowcraft/templates/spades.py b/flowcraft/templates/spades.py index 32bd9e68..b283c6d9 100644 --- a/flowcraft/templates/spades.py +++ b/flowcraft/templates/spades.py @@ -258,7 +258,7 @@ def main(sample_id, fastq_pair, max_len, kmer, opts, clear): # Remove input fastq files when clear option is specified. # Only remove temporary input when the expected output exists. - if clear == "true" and os.path.exists("contigs.fasta"): + if clear == "true" and os.path.exists(assembly_file): clean_up(fastq_pair) diff --git a/flowcraft/templates/trimmomatic.py b/flowcraft/templates/trimmomatic.py index 7264bd1c..0ddc28fb 100644 --- a/flowcraft/templates/trimmomatic.py +++ b/flowcraft/templates/trimmomatic.py @@ -392,7 +392,9 @@ def main(sample_id, fastq_pair, trim_range, trim_opts, phred, adapters_file, trimmomatic_log("{}_trimlog.txt".format(sample_id), sample_id) - clean_up(fastq_pair, clear) + if p.returncode == 0 and os.path.exists("{}_1_trim.fastq.gz".format( + SAMPLE_ID)): + clean_up(fastq_pair, clear) # Check if trimmomatic ran successfully. If not, write the error message # to the status channel and exit.