From 321b01388075fb9017115203c6e057e07eb2c14f Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Tue, 7 Apr 2026 17:19:25 -0500 Subject: [PATCH 01/16] Refactor EST all-by-all to support convergence ratio staging --- pipelines/est/est.nf | 11 +++- pipelines/est/subworkflows/all_by_all.nf | 74 +++++++++++++++--------- 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/pipelines/est/est.nf b/pipelines/est/est.nf index 21bb516c..069788c2 100644 --- a/pipelines/est/est.nf +++ b/pipelines/est/est.nf @@ -7,8 +7,15 @@ workflow { initial_data = IMPORT_AND_FILTER() - axa_results = ALL_BY_ALL(initial_data.fasta_file) + // Wrap the fasta file into a tuple. This is because we share code with the convergence + // ratio pipeline, which needs to run all of the processes in ALL_BY_ALL on many fasta + // files, not just the one that we have here. + blast_input = initial_data.fasta_file.map { fasta_file -> tuple("ALL_DATA", fasta_file) } + axa_results = ALL_BY_ALL(blast_input) - results = REPORTING(axa_results.blast_parquet, axa_results.fasta_lengths_parquet, initial_data.fasta_file, initial_data.accession_table, initial_data.import_stats) + blast_parquet = axa_results.blast_parquet.map { junk, file -> file } + fasta_lengths_parquet = axa_results.fasta_lengths_parquet.map { junk, file -> file } + + results = REPORTING(blast_parquet, fasta_lengths_parquet, initial_data.fasta_file, initial_data.accession_table, initial_data.import_stats) } diff --git a/pipelines/est/subworkflows/all_by_all.nf b/pipelines/est/subworkflows/all_by_all.nf index dbced3fe..88c3b825 100644 --- a/pipelines/est/subworkflows/all_by_all.nf +++ b/pipelines/est/subworkflows/all_by_all.nf @@ -1,12 +1,12 @@ -include { condense_redundant } from "../../shared/nextflow/sequence.nf" - process create_blast_db { input: - path fasta_file + tuple val(fid), path(fasta_file) + output: - path "database.*", emit: 'database_files' - val "database", emit: 'database_name' + tuple val(fid), path("database.*"), val("database") + + script: """ formatdb -i $fasta_file -n database -p T -o T """ @@ -14,11 +14,11 @@ process create_blast_db { process all_by_all_blast { input: - path(blast_db_files, arity: 5) - val blast_db_name - path frac + tuple val(fid), path(blast_db_files, arity: 5), val(blast_db_name), path(frac) + output: - path "${frac}.tab.sorted.parquet" + tuple val(fid), path("${frac}.tab.sorted.parquet") + script: """ # run blast to get similarity metrics @@ -36,10 +36,12 @@ process all_by_all_blast { process blastreduce_transcode_fasta { input: - path fasta_file + tuple val(fid), path(fasta_file) + output: - path "${fasta_file.getName()}.parquet" + tuple val(fid), path("${fasta_file.getName()}.parquet") + script: """ python $projectDir/blastreduce/transcode_fasta_lengths.py --fasta $fasta_file --output ${fasta_file.getName()}.parquet """ @@ -47,9 +49,12 @@ process blastreduce_transcode_fasta { process split_fasta { input: - path fasta_file + tuple val(fid), path(fasta_file) + output: - path "fracfile-*.fa" + tuple val(fid), path("fracfile-*.fa") + + script: """ perl $projectDir/split_fasta/split_fasta.pl -parts ${params.num_fasta_shards} -source ${fasta_file} """ @@ -57,13 +62,14 @@ process split_fasta { process blastreduce { publishDir params.final_output_dir, mode: 'copy', enabled: !params.multiplex + input: - path blast_files - path fasta_length_parquet + tuple val(fid), path(blast_files), path(fasta_length_parquet) output: - path "1.out.parquet" + tuple val(fid), path("1.out.parquet") + script: """ DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) python $projectDir/blastreduce/render_reduce_sql_template.py --blast-output $blast_files --sql-template $projectDir/templates/reduce-template.sql --fasta-length-parquet $fasta_length_parquet --duckdb-memory-limit ${params.duckdb_memory_limit} --duckdb-temp-dir \${DUCKDB_TEMP} --sql-output-file allreduce.sql @@ -71,14 +77,32 @@ process blastreduce { """ } +// Formerly known as multiplex +process condense_redundant { + input: + tuple val(fid), path(fasta_file) + + output: + tuple val(fid), path("sequences.fasta"), emit: "fasta_file" + tuple val(fid), path("sequences.fasta.clstr"), emit: "condensed" + + script: + """ + cd-hit -d 0 -c 1 -s 1 -i ${fasta_file} -o sequences.fasta -M "${params.cdhit_memory_limit}" + """ +} + // Formerly known as demultiplex process restore_condensed { publishDir params.final_output_dir, mode: 'copy', overwrite: true + input: - path blast_parquet, stageAs: 'reduced.parquet' - path condensed + tuple val(fid), path(blast_parquet, stageAs: "reduced.parquet"), path(condensed) + output: - path '1.out.parquet' + tuple val(fid), path("1.out.parquet") + + script: """ DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) python $projectDir/condense/render_restore_sql_template.py --blast-parquet $blast_parquet --sql-template $projectDir/templates/restore-template.sql --duckdb-memory-limit ${params.duckdb_memory_limit} --duckdb-temp-dir \${DUCKDB_TEMP} --sql-output-file restore.sql @@ -111,18 +135,16 @@ workflow ALL_BY_ALL { // All-by-all BLAST fasta_shards = split_fasta(blast_input_fasta) - blast_fractions = all_by_all_blast( - blastdb.database_files, - blastdb.database_name, - fasta_shards.flatten() - ) | collect + + blast_input = blastdb.combine(fasta_shards.transpose(), by: 0) + blast_fractions = all_by_all_blast( blast_input ).groupTuple() // Eliminate duplicate and self-edges - reduced_blast_parquet = blastreduce(blast_fractions, fasta_lengths_parquet) + reduced_blast_parquet = blastreduce(blast_fractions.join(fasta_lengths_parquet)) // Expand redundant sequences after BLAST computation (formerly known as demultiplex) if (params.multiplex) { - reduced_blast_parquet = restore_condensed(reduced_blast_parquet, condensed) + reduced_blast_parquet = restore_condensed(reduced_blast_parquet.join(condensed)) } emit: From b05fec94c5c8af20078061c137bd3727170ab0c7 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Tue, 7 Apr 2026 17:22:25 -0500 Subject: [PATCH 02/16] Add convergence ratio pipeline - Extensively borrows from EST, for now is integrated into pipelines/est directory --- bin/create_conv_ratio_nextflow_params.py | 84 ++++++++++++ pipelines/est/convergenceratio.nf | 128 ++++++++++++++++++ pipelines/est/statistics/merge_conv_ratios.py | 103 ++++++++++++++ 3 files changed, 315 insertions(+) create mode 100755 bin/create_conv_ratio_nextflow_params.py create mode 100644 pipelines/est/convergenceratio.nf create mode 100644 pipelines/est/statistics/merge_conv_ratios.py diff --git a/bin/create_conv_ratio_nextflow_params.py b/bin/create_conv_ratio_nextflow_params.py new file mode 100755 index 00000000..d209ebe0 --- /dev/null +++ b/bin/create_conv_ratio_nextflow_params.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 + +import argparse +import glob +import json +import os + +import shared_args + +NXF_SCRIPT = "pipelines/est/convergenceratio.nf" + +def add_args(parser: argparse.ArgumentParser): + """ + Add arguments for Convergence Ratio pipeline to ``parser`` + """ + parser.add_argument("--ascore", required=False, type=int, default=0, help="The alignment score to use for the BLAST computations; this is converted to an e-value") + parser.add_argument("--colored-ssn-input", required=True, type=str, help="The SSN file to use for computing convergence ratios; must be the output of the Color SSN or Cluster Analysis pipelines") + parser.add_argument("--fasta-db", type=str, required=True, help="FASTA file or BLAST database to retrieve sequences from") + shared_args.add_args(parser) + +def check_args(args: argparse.Namespace) -> argparse.Namespace: + """ + Test file paths and rewrite them to be absolute + """ + fail = False + + # Check for shared args validity + validated_args = shared_args.check_args(args) + if validated_args is None: + fail = True + else: + args = validated_args + + if not os.path.exists(args.colored_ssn_input): + print(f"SSN Input file '{args.colored_ssn_input}' does not exist") + fail = True + + if len(glob.glob(f"{args.fasta_db}.*")) == 0: + print(f"FASTA database '{args.fasta_db}' not found") + fail = True + + if args.workflow_def is None: + args.workflow_def = os.path.abspath(NXF_SCRIPT) + + if fail: + print("Failed to render params template") + exit(1) + else: + args.colored_ssn_input = os.path.abspath(args.colored_ssn_input) + args.fasta_db = os.path.abspath(args.fasta_db) + return args + +def create_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Render params.yml for GND nextflow pipeline") + add_args(parser) + return parser + +def render_params(colored_ssn_input, ascore, efi_config, efi_db, fasta_db, output_dir, **kwargs: dict): + params = { + "ascore": ascore, + "efi_config": efi_config, + "efi_db": efi_db, + "fasta_db": fasta_db, + "final_output_dir": output_dir, + "ssn_input": colored_ssn_input, + } + + # Handle kwargs dict, assuming each entry is a parameter to be added to params + params.update(kwargs) + + # Remove parameter keys with None values + params = {key: value for key, value in params.items() if value != None} + + params_file = os.path.join(output_dir, shared_args.PARAMS_NAME) + with open(params_file, "w") as f: + json.dump(params, f, indent=4) + print(f"Wrote params to '{params_file}'") + return params_file + +if __name__ == "__main__": + args = check_args(create_parser().parse_args()) + params_file = render_params(**vars(args)) + shared_args.save_run_script(args, workflow_def=args.workflow_def, params_file=params_file) + diff --git a/pipelines/est/convergenceratio.nf b/pipelines/est/convergenceratio.nf new file mode 100644 index 00000000..b841cf8e --- /dev/null +++ b/pipelines/est/convergenceratio.nf @@ -0,0 +1,128 @@ + +include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../est/subworkflows/all_by_all.nf" +include { unzip_ssn } from "../shared/nextflow/util.nf" +include { compute_clusters; get_id_list; get_ssn_id_info } from "../shared/nextflow/color_workflow.nf" + +process compute_conv_ratio { + input: + tuple val(cluster_id), path(blast_parquet), path(fasta_file) + + output: + path("${cluster_id}_conv_ratio.json") + + script: + """ + python $projectDir/statistics/conv_ratio.py --blast-output ${blast_parquet} --fasta ${fasta_file} --output "${cluster_id}_conv_ratio.json" + """ +} + +process merge_conv_ratios { + publishDir params.final_output_dir, mode: "copy", pattern: "{conv_ratio.tab}" + + input: + path stats_files + + output: + path "conv_ratio.tab" + + script: + """ + python $projectDir/statistics/merge_conv_ratios.py --stats ${stats_files} --output conv_ratio.tab + """ +} + +process get_fasta_files { + input: + tuple val(cluster_id), path(id_file) + + output: + tuple val(cluster_id), path("*.fasta", arity: "1") + + script: + """ + base_filename=\$(basename $id_file .txt) + fasta_file="\${base_filename}.fasta" + perl $projectDir/../shared/perl/get_sequences.pl --fasta-db ${params.fasta_db} --sequence-ids-file ${id_file} --output-sequence-file \${fasta_file} + """ +} + +process get_selected_id_lists { + input: + path uniprot_dir + path uniref90_dir + path uniref50_dir + + output: + path "selected_ids/*.txt", emit: "id_files" + + script: + """ + mkdir -p selected_ids + + if ls ${uniref50_dir}/*.txt 1> /dev/null 2>&1; then + cp ${uniref50_dir}/*.txt selected_ids/ + elif ls ${uniref90_dir}/*.txt 1> /dev/null 2>&1; then + cp ${uniref90_dir}/*.txt selected_ids/ + else + cp ${uniprot_dir}/*.txt selected_ids/ + fi + """ +} + +workflow { + if (params.ssn_input =~ /\.zip$/) { + ssn_file = unzip_ssn(params.ssn_input) + } else { + ssn_file = params.ssn_input + } + + // Get the index and ID mapping tables and edgelist + ssn_data = get_ssn_id_info(ssn_file) + + // Convert to value channel + sequence_type_val = ssn_data.sequence_type.map { it.trim() } + + compute_info = compute_clusters(ssn_data.edgelist, ssn_data.index_seqid_map) + + id_list_data = get_id_list(compute_info.cluster_id_map, compute_info.singletons, ssn_data.seqid_source_map, sequence_type_val) + + input_seq_type_id_lists = get_selected_id_lists(id_list_data.uniprot_dir, id_list_data.uniref90_dir, id_list_data.uniref50_dir) + + cluster_id_lists = input_seq_type_id_lists + .flatten() + .filter { file -> !file.name.contains("_All") } // Don"t include the file with all sequences in the analysis + .filter { file -> !file.name.contains("singleton") } // Don"t include singletons in the analysis + .map { file -> + // Transform the file name into a cluster ID. Removes "cluster_UniProt_" (or + // "cluster_UniRefXX_") from the front if present + def clean_id = file.simpleName.replaceAll(/^cluster_Uni(Prot|Ref90|Ref50)_/, "") + return tuple(clean_id, file) + } + + cluster_fasta = get_fasta_files(cluster_id_lists) + // cluster_fasta is a channel of tuples of [clusterId, file] + + // Run a workflow that is nearly identical to the ALL_BY_ALL workflow in the est pipeline. + reduced_fasta = condense_redundant(cluster_fasta) + + blast_databases = create_blast_db(reduced_fasta.fasta_file) + + fasta_lengths_parquet = blastreduce_transcode_fasta(cluster_fasta) + + fasta_shards = split_fasta(reduced_fasta.fasta_file) + + blast_input = blast_databases.combine(fasta_shards.transpose(), by: 0) +blast_input.view() + +// blast_fractions = all_by_all_blast( blast_input ).groupTuple() +// +// reduced_blast_parquet = blastreduce(blast_fractions.join(fasta_lengths_parquet)) +// +// // Expand redundant sequences after BLAST computation (formerly known as demultiplex) +// blast_parquet = restore_condensed(reduced_blast_parquet.join(reduced_fasta.condensed)) +// +// stats_files = compute_conv_ratio(blast_parquet.combine(fasta_lengths_parquet, by: 0)) +// +// merge_conv_ratios(stats_files.collect()) +} + diff --git a/pipelines/est/statistics/merge_conv_ratios.py b/pipelines/est/statistics/merge_conv_ratios.py new file mode 100644 index 00000000..34972159 --- /dev/null +++ b/pipelines/est/statistics/merge_conv_ratios.py @@ -0,0 +1,103 @@ + +import argparse +import json +import glob +import sys +import os +import math +import re + +def main(): + parser = argparse.ArgumentParser( + description="Merge individual convergence ratio JSON files into a single tab-separated file." + ) + + parser.add_argument( + '--output', + required=True, + help="Path to the output .tab file" + ) + parser.add_argument( + '--stats', + nargs='*', + help="List of input JSON files. If not provided, globs *_conv_ratio.json in the current directory." + ) + + args = parser.parse_args() + + input_files = args.stats if args.stats else glob.glob('*_conv_ratio.json') + + if not input_files: + print("Warning: No input JSON files found.", file=sys.stderr) + + # Read all data into a dictionary first + data_dict = {} + for json_file in input_files: + filename = os.path.basename(json_file) + + # Extract the cluster number + match = re.search(r'[cC]luster_(\d+)_conv_ratio\.json', filename) + if match: + cluster_id = match.group(1) + else: + cluster_id = filename.replace('_conv_ratio.json', '') + + try: + with open(json_file, 'r') as f_in: + data = json.load(f_in) + + ratio_val = data.get('convergence_ratio', 'NA') + edges_val = data.get('num_blast_edges', 'NA') + nodes_val = data.get('num_unique_ids', 'NA') + + if ratio_val != 'NA': + ratio_val = float(ratio_val) + + data_dict[cluster_id] = { + 'ratio': ratio_val, + 'edges': edges_val, + 'nodes': nodes_val + } + except Exception as e: + print(f"Error processing file {json_file}: {e}", file=sys.stderr) + + # Calculate global min, max, and formatting digits + # Extract only the valid floats from our nested dictionaries + valid_ratios = [v['ratio'] for v in data_dict.values() if isinstance(v['ratio'], float)] + + digits = 2 + if valid_ratios: + max_val = max(valid_ratios) + min_val = min(valid_ratios) + diff = max_val - min_val + + if diff > 1e10: + digits = -int(math.log(diff) - 0.5) + 2 + + digits = max(0, digits) + + # Output the data to a tab-separated file + try: + with open(args.output, 'w') as f_out: + f_out.write("\t".join(["Cluster Number", "Convergence Ratio", "Number of IDs", "Number of BLAST Matches", "SSN Cluster Convergence Ratio", "Number of Nodes", "Number of Edges"]) + "\n") + + # Iterate through the dictionary and write the formatted rows + for cluster_id, metrics in data_dict.items(): + ratio_val = metrics['ratio'] + edges_val = metrics['edges'] + nodes_val = metrics['nodes'] + + if isinstance(ratio_val, float): + formatted_ratio = f"{ratio_val:.{digits}e}" + else: + formatted_ratio = str(ratio_val) + + # Write all four columns separated by tabs + f_out.write("\t".join([cluster_id, formatted_ratio, str(edges_val), str(nodes_val), ]) + "\n") + + except IOError as e: + print(f"Error writing to output file {args.output}: {e}", file=sys.stderr) + sys.exit(1) + +if __name__ == "__main__": + main() From 8ee5e39043fb9c2374015eb63095e230df04905f Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Tue, 7 Apr 2026 17:23:34 -0500 Subject: [PATCH 03/16] Refactor to remove redundance processes --- pipelines/est/subworkflows/import.nf | 17 ++++++++++++++++- pipelines/shared/nextflow/sequence.nf | 27 --------------------------- 2 files changed, 16 insertions(+), 28 deletions(-) diff --git a/pipelines/est/subworkflows/import.nf b/pipelines/est/subworkflows/import.nf index 4b88dec8..cd26f2e0 100644 --- a/pipelines/est/subworkflows/import.nf +++ b/pipelines/est/subworkflows/import.nf @@ -1,5 +1,5 @@ -include { get_sequences; split_sequence_ids } from "../../shared/nextflow/sequence.nf" +include { get_sequences } from "../../shared/nextflow/sequence.nf" process get_source_ids { publishDir params.final_output_dir, mode: 'copy' @@ -108,6 +108,21 @@ process cat_fasta_files { } } +process split_sequence_ids { + input: + path accessions_file + val num_accession_shards + output: + path "accession_ids.txt.part*" + """ + if [[ -s "${accessions_file}" ]]; then + split -d -e -n r/$num_accession_shards ${accessions_file} accession_ids.txt.part + else + touch accession_ids.txt.part + fi + """ +} + process import_fasta { publishDir params.final_output_dir, mode: 'copy' input: diff --git a/pipelines/shared/nextflow/sequence.nf b/pipelines/shared/nextflow/sequence.nf index 6f629bfb..39604447 100644 --- a/pipelines/shared/nextflow/sequence.nf +++ b/pipelines/shared/nextflow/sequence.nf @@ -1,19 +1,4 @@ -process split_sequence_ids { - input: - path accessions_file - val num_accession_shards - output: - path "accession_ids.txt.part*" - """ - if [[ -s "${accessions_file}" ]]; then - split -d -e -n r/$num_accession_shards ${accessions_file} accession_ids.txt.part - else - touch accession_ids.txt.part - fi - """ -} - process get_sequences { input: path accession_ids @@ -29,18 +14,6 @@ process get_sequences { """ } -// Formerly known as multiplex -process condense_redundant { - input: - path fasta_file - output: - path "sequences.fasta", emit: "fasta_file" - path "sequences.fasta.clstr", emit: "condensed" - """ - cd-hit -d 0 -c 1 -s 1 -i ${fasta_file} -o sequences.fasta -M "${params.cdhit_memory_limit}" - """ -} - process get_length_histogram { input: path fasta_file From e3a011f3998b793df96758240d062954602e3133 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Tue, 7 Apr 2026 17:35:01 -0500 Subject: [PATCH 04/16] Remove debugging steps in conv ratio pipeline --- pipelines/est/convergenceratio.nf | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/pipelines/est/convergenceratio.nf b/pipelines/est/convergenceratio.nf index b841cf8e..f2a26f1c 100644 --- a/pipelines/est/convergenceratio.nf +++ b/pipelines/est/convergenceratio.nf @@ -112,17 +112,16 @@ workflow { fasta_shards = split_fasta(reduced_fasta.fasta_file) blast_input = blast_databases.combine(fasta_shards.transpose(), by: 0) -blast_input.view() - -// blast_fractions = all_by_all_blast( blast_input ).groupTuple() -// -// reduced_blast_parquet = blastreduce(blast_fractions.join(fasta_lengths_parquet)) -// -// // Expand redundant sequences after BLAST computation (formerly known as demultiplex) -// blast_parquet = restore_condensed(reduced_blast_parquet.join(reduced_fasta.condensed)) -// -// stats_files = compute_conv_ratio(blast_parquet.combine(fasta_lengths_parquet, by: 0)) -// -// merge_conv_ratios(stats_files.collect()) + + blast_fractions = all_by_all_blast( blast_input ).groupTuple() + + reduced_blast_parquet = blastreduce(blast_fractions.join(fasta_lengths_parquet)) + + // Expand redundant sequences after BLAST computation (formerly known as demultiplex) + blast_parquet = restore_condensed(reduced_blast_parquet.join(reduced_fasta.condensed)) + + stats_files = compute_conv_ratio(blast_parquet.combine(fasta_lengths_parquet, by: 0)) + + merge_conv_ratios(stats_files.collect()) } From c2c9bd709143efe262800c377f1187bd2fac872d Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 15:05:44 -0500 Subject: [PATCH 05/16] Convert alignment score into BLAST evalue for conv ratio calculation --- bin/create_conv_ratio_nextflow_params.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bin/create_conv_ratio_nextflow_params.py b/bin/create_conv_ratio_nextflow_params.py index d209ebe0..3996588c 100755 --- a/bin/create_conv_ratio_nextflow_params.py +++ b/bin/create_conv_ratio_nextflow_params.py @@ -13,7 +13,7 @@ def add_args(parser: argparse.ArgumentParser): """ Add arguments for Convergence Ratio pipeline to ``parser`` """ - parser.add_argument("--ascore", required=False, type=int, default=0, help="The alignment score to use for the BLAST computations; this is converted to an e-value") + parser.add_argument("--ascore", required=False, type=int, default=5, help="The alignment score to use for the BLAST computations; this is converted to an e-value") parser.add_argument("--colored-ssn-input", required=True, type=str, help="The SSN file to use for computing convergence ratios; must be the output of the Color SSN or Cluster Analysis pipelines") parser.add_argument("--fasta-db", type=str, required=True, help="FASTA file or BLAST database to retrieve sequences from") shared_args.add_args(parser) @@ -34,7 +34,7 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: if not os.path.exists(args.colored_ssn_input): print(f"SSN Input file '{args.colored_ssn_input}' does not exist") fail = True - + if len(glob.glob(f"{args.fasta_db}.*")) == 0: print(f"FASTA database '{args.fasta_db}' not found") fail = True @@ -49,15 +49,16 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: args.colored_ssn_input = os.path.abspath(args.colored_ssn_input) args.fasta_db = os.path.abspath(args.fasta_db) return args - + def create_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Render params.yml for GND nextflow pipeline") add_args(parser) return parser def render_params(colored_ssn_input, ascore, efi_config, efi_db, fasta_db, output_dir, **kwargs: dict): + evalue = f"1e-{ascore}" params = { - "ascore": ascore, + "blast_evalue": evalue, "efi_config": efi_config, "efi_db": efi_db, "fasta_db": fasta_db, From 013eac1284487bc816bcd5964d16b85b36905b82 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 15:06:09 -0500 Subject: [PATCH 06/16] Add color SSN convergence ratio and add code documentation --- pipelines/est/convergenceratio.nf | 63 +++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/pipelines/est/convergenceratio.nf b/pipelines/est/convergenceratio.nf index f2a26f1c..5300e3af 100644 --- a/pipelines/est/convergenceratio.nf +++ b/pipelines/est/convergenceratio.nf @@ -1,9 +1,9 @@ include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../est/subworkflows/all_by_all.nf" include { unzip_ssn } from "../shared/nextflow/util.nf" -include { compute_clusters; get_id_list; get_ssn_id_info } from "../shared/nextflow/color_workflow.nf" +include { compute_clusters; get_conv_ratio_table; get_id_list; get_ssn_id_info } from "../shared/nextflow/color_workflow.nf" -process compute_conv_ratio { +process compute_blast_conv_ratio { input: tuple val(cluster_id), path(blast_parquet), path(fasta_file) @@ -12,7 +12,10 @@ process compute_conv_ratio { script: """ - python $projectDir/statistics/conv_ratio.py --blast-output ${blast_parquet} --fasta ${fasta_file} --output "${cluster_id}_conv_ratio.json" + python $projectDir/statistics/conv_ratio.py \ + --blast-output ${blast_parquet} \ + --fasta ${fasta_file} \ + --output "${cluster_id}_conv_ratio.json" """ } @@ -21,13 +24,17 @@ process merge_conv_ratios { input: path stats_files + path cr_table output: path "conv_ratio.tab" script: """ - python $projectDir/statistics/merge_conv_ratios.py --stats ${stats_files} --output conv_ratio.tab + python $projectDir/statistics/merge_conv_ratios.py \ + --ssn-conv-ratio ${cr_table} \ + --stats ${stats_files} \ + --output conv_ratio.tab """ } @@ -40,9 +47,10 @@ process get_fasta_files { script: """ - base_filename=\$(basename $id_file .txt) - fasta_file="\${base_filename}.fasta" - perl $projectDir/../shared/perl/get_sequences.pl --fasta-db ${params.fasta_db} --sequence-ids-file ${id_file} --output-sequence-file \${fasta_file} + perl $projectDir/../shared/perl/get_sequences.pl \ + --fasta-db ${params.fasta_db} \ + --sequence-ids-file ${id_file} \ + --output-sequence-file "${id_file.baseName}.fasta" """ } @@ -76,18 +84,27 @@ workflow { ssn_file = params.ssn_input } - // Get the index and ID mapping tables and edgelist - ssn_data = get_ssn_id_info(ssn_file) + // + // STEP 1: GET ID LISTS AND COMPUTE CLUSTERS + // - // Convert to value channel - sequence_type_val = ssn_data.sequence_type.map { it.trim() } + ssn_data = get_ssn_id_info(ssn_file) compute_info = compute_clusters(ssn_data.edgelist, ssn_data.index_seqid_map) + // Convert the sequence type to a value channel + sequence_type_val = ssn_data.sequence_type.map { it.trim() } + id_list_data = get_id_list(compute_info.cluster_id_map, compute_info.singletons, ssn_data.seqid_source_map, sequence_type_val) + // Get the ID lists that will be used, e.g. the original sequences from the SSN input_seq_type_id_lists = get_selected_id_lists(id_list_data.uniprot_dir, id_list_data.uniref90_dir, id_list_data.uniref50_dir) + // + // STEP 2: OBTAIN THE FASTA FILES + // + + // Get the cluster IDs from the file names cluster_id_lists = input_seq_type_id_lists .flatten() .filter { file -> !file.name.contains("_All") } // Don"t include the file with all sequences in the analysis @@ -99,10 +116,15 @@ workflow { return tuple(clean_id, file) } + // Get FASTA files; the return value is a channel of tuples of [clusterId, file] cluster_fasta = get_fasta_files(cluster_id_lists) - // cluster_fasta is a channel of tuples of [clusterId, file] + // + // STEP 3: RUN THE EST-BASED ALL-BY-ALL WORKFLOW // Run a workflow that is nearly identical to the ALL_BY_ALL workflow in the est pipeline. + // See the workflow for more information on how this works. + // + reduced_fasta = condense_redundant(cluster_fasta) blast_databases = create_blast_db(reduced_fasta.fasta_file) @@ -113,15 +135,26 @@ workflow { blast_input = blast_databases.combine(fasta_shards.transpose(), by: 0) - blast_fractions = all_by_all_blast( blast_input ).groupTuple() + blast_fractions = all_by_all_blast(blast_input).groupTuple() reduced_blast_parquet = blastreduce(blast_fractions.join(fasta_lengths_parquet)) // Expand redundant sequences after BLAST computation (formerly known as demultiplex) blast_parquet = restore_condensed(reduced_blast_parquet.join(reduced_fasta.condensed)) - stats_files = compute_conv_ratio(blast_parquet.combine(fasta_lengths_parquet, by: 0)) + // + // STEP 4: COMPUTE CONVERGENCE RATIOS + // + + // Compute the convergence ratio based on the edges and nodes in each cluster + cr_table = get_conv_ratio_table(ssn_data.edgelist, ssn_data.index_seqid_map, compute_info.cluster_id_map, ssn_data.seqid_source_map) + + // Compute the convergence ratio based on the BLAST results + //stats_files = compute_blast_conv_ratio(blast_parquet.combine(fasta_lengths_parquet, by: 0)) + //stats_files = compute_blast_conv_ratio(reduced_blast_parquet.combine(fasta_lengths_parquet, by: 0)) + stats_files = compute_blast_conv_ratio(reduced_blast_parquet.join(fasta_lengths_parquet)) - merge_conv_ratios(stats_files.collect()) + // Merge the data for all the clusters into one file + merge_conv_ratios(stats_files.collect(), cr_table) } From c3312c7eec40b5c759cb25158bfb932dc9a34bc7 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 15:06:45 -0500 Subject: [PATCH 07/16] Add extra sig fig to color SSN convergence ratio output format --- pipelines/shared/perl/compute_conv_ratio.pl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/shared/perl/compute_conv_ratio.pl b/pipelines/shared/perl/compute_conv_ratio.pl index 71ab562c..55a68c76 100644 --- a/pipelines/shared/perl/compute_conv_ratio.pl +++ b/pipelines/shared/perl/compute_conv_ratio.pl @@ -115,7 +115,7 @@ sub computeConvRatio { my $denom = $numIds * ($numIds - 1); my $convRatio = 0; $convRatio = $numDegree / $denom if $denom > 0; - $convRatio = sprintf("%.1e", $convRatio); + $convRatio = sprintf("%.2e", $convRatio); push @data, [$cnum, $convRatio, $numNodes, $numIds, $numDegree / 2]; } From b1997ec1d2596e877bc86fafd84619df738d45b9 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 15:07:44 -0500 Subject: [PATCH 08/16] Add SSN cluster-based convergence ratio into output --- pipelines/est/statistics/merge_conv_ratios.py | 212 ++++++++++++------ 1 file changed, 147 insertions(+), 65 deletions(-) diff --git a/pipelines/est/statistics/merge_conv_ratios.py b/pipelines/est/statistics/merge_conv_ratios.py index 34972159..d62f47cd 100644 --- a/pipelines/est/statistics/merge_conv_ratios.py +++ b/pipelines/est/statistics/merge_conv_ratios.py @@ -1,103 +1,185 @@ import argparse -import json import glob -import sys -import os +import json import math +import os import re +import sys +from typing import Any, Dict, List + +def create_parser() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Merge individual convergence ratio JSON files into a single tab-separated file.") + parser.add_argument('--ssn-conv-ratio', required=True, help="Path to the SSN cluster-based convergence ratio file from the colorssn shared workflow.") + parser.add_argument('--output', required=True, help="Path to the output .tab file") + parser.add_argument('--stats', nargs='*', help="List of input JSON files. If not provided, globs *_conv_ratio.json in the current directory.") + return parser -def main(): - parser = argparse.ArgumentParser( - description="Merge individual convergence ratio JSON files into a single tab-separated file." - ) - - parser.add_argument( - '--output', - required=True, - help="Path to the output .tab file" - ) - parser.add_argument( - '--stats', - nargs='*', - help="List of input JSON files. If not provided, globs *_conv_ratio.json in the current directory." - ) - - args = parser.parse_args() - - input_files = args.stats if args.stats else glob.glob('*_conv_ratio.json') - - if not input_files: +def check_args(args: argparse.Namespace) -> argparse.Namespace: + if not args.stats: print("Warning: No input JSON files found.", file=sys.stderr) - - # Read all data into a dictionary first - data_dict = {} + exit(1) + return args + +def load_blast_conv_files(input_files: List) -> Dict[str, Dict[str, str]]: + """ + Load the BLAST-based convergence ratios from the given files. + + Parameters + ---------- + input_files + A list of paths to JSON files containing convergence ratio data, one per cluster + + Returns + ------- + A dictionary of dictionaries, mapping cluster ID to data for that cluster + """ + + blast_data = {} for json_file in input_files: filename = os.path.basename(json_file) - + # Extract the cluster number match = re.search(r'[cC]luster_(\d+)_conv_ratio\.json', filename) if match: cluster_id = match.group(1) else: cluster_id = filename.replace('_conv_ratio.json', '') - + try: - with open(json_file, 'r') as f_in: - data = json.load(f_in) - - ratio_val = data.get('convergence_ratio', 'NA') - edges_val = data.get('num_blast_edges', 'NA') - nodes_val = data.get('num_unique_ids', 'NA') - - if ratio_val != 'NA': - ratio_val = float(ratio_val) - - data_dict[cluster_id] = { - 'ratio': ratio_val, - 'edges': edges_val, - 'nodes': nodes_val + with open(json_file, 'r') as fh: + data = json.load(fh) + + conv_ratio = data.get('convergence_ratio', 'NA') + edges = data.get('num_blast_edges', 'NA') + nodes = data.get('num_unique_ids', 'NA') + + if conv_ratio != 'NA': + conv_ratio = float(conv_ratio) + + blast_data[cluster_id] = { + 'ratio': conv_ratio, + 'edges': edges, + 'nodes': nodes } except Exception as e: print(f"Error processing file {json_file}: {e}", file=sys.stderr) - # Calculate global min, max, and formatting digits + return blast_data + +def load_ssn_conv_data(data_file: str) -> Dict[str, Dict[str, str]]: + """ + Load the convergence ratio data from the SSN-based table. + + Parameters + ---------- + data_file + Path to the SSN-based convergence ratio table file + + Returns + ------- + A dictionary of dictionaries, mapping cluster ID to data for that cluster + """ + + ssn_data = {} + try: + with open(data_file, 'r') as fh: + header = next(fh) + + for line in fh: + #Cluster Number Convergence Ratio Number of SSN Nodes Number of UniProt IDs Number of Edges + cluster_num, conv_ratio, num_nodes, num_ids, num_edges = re.split(r'\t', line.strip()) + ssn_data[cluster_num] = { + 'ratio': conv_ratio, + 'edges': num_edges, + 'nodes': num_nodes, + } + + except IOError as e: + print(f"Error reading SSN convergence ratio data file {data_file}: {e}", file=sys.stderr) + sys.exit(1) + + return ssn_data + +def calc_sig_figs(blast_conv_data: Dict[Any, Any]) -> int: + """ + Compute the number of significant figures that are used for formatting. + + Parameters + ---------- + blast_conv_data + Dictionary of dictionaries, mapping cluster ID to data + + Returns + ------- + Int, representing the number of digits to use when formatting the output convergence ratio + """ + # Extract only the valid floats from our nested dictionaries - valid_ratios = [v['ratio'] for v in data_dict.values() if isinstance(v['ratio'], float)] - + valid_ratios = [v['ratio'] for v in blast_conv_data.values() if isinstance(v['ratio'], float)] + digits = 2 if valid_ratios: max_val = max(valid_ratios) min_val = min(valid_ratios) diff = max_val - min_val - + if diff > 1e10: digits = -int(math.log(diff) - 0.5) + 2 - + digits = max(0, digits) - - # Output the data to a tab-separated file + + return digits + +def save_merged_data(output: str, blast_conv_data: Dict[Any, Any], ssn_conv_data: Dict[Any, Any], digits: int): + """ + Merge the individual convergence ratio data into one file. + + Parameters + ---------- + output + Path to the output file + blast_conv_data + Dictionary of dictionaries, mapping cluster ID to BLAST-based data + ssn_conv_data + Dictionary of dictionaries, mapping cluster ID to SSN-based data + digits: + The number of digits to use when formatting the output convergence ratio + """ + try: - with open(args.output, 'w') as f_out: + with open(output, 'w') as f_out: f_out.write("\t".join(["Cluster Number", "Convergence Ratio", "Number of IDs", "Number of BLAST Matches", "SSN Cluster Convergence Ratio", "Number of Nodes", "Number of Edges"]) + "\n") # Iterate through the dictionary and write the formatted rows - for cluster_id, metrics in data_dict.items(): - ratio_val = metrics['ratio'] - edges_val = metrics['edges'] - nodes_val = metrics['nodes'] - - if isinstance(ratio_val, float): - formatted_ratio = f"{ratio_val:.{digits}e}" - else: - formatted_ratio = str(ratio_val) - + for cluster_id, metrics in blast_conv_data.items(): + blast_conv_ratio = metrics['ratio'] + blast_conv_ratio_fmt = f"{blast_conv_ratio:.{digits}e}" if isinstance(blast_conv_ratio, float) else str(blast_conv_ratio) + + ssn_metrics = ssn_conv_data.get(cluster_id) + ssn_conv_ratio = ssn_metrics['ratio'] + ssn_conv_ratio_fmt = f"{ssn_conv_ratio:.{digits}e}" if isinstance(ssn_conv_ratio, float) else str(ssn_conv_ratio) + # Write all four columns separated by tabs - f_out.write("\t".join([cluster_id, formatted_ratio, str(edges_val), str(nodes_val), ]) + "\n") - + f_out.write("\t".join([cluster_id, blast_conv_ratio_fmt, str(metrics['nodes']), str(metrics['edges']), ssn_conv_ratio_fmt, str(ssn_metrics['nodes']), str(ssn_metrics['edges'])]) + "\n") + + except KeyError as e: + print(f"Unable to find cluster ID {e.args[0]} in SSN convergence ratio data", file=sys.stderr) + sys.exit(1) + except IOError as e: - print(f"Error writing to output file {args.output}: {e}", file=sys.stderr) + print(f"Error writing to output file {output}: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": - main() + args = check_args(create_parser().parse_args()) + + blast_conv_data = load_blast_conv_files(args.stats) + + ssn_conv_data = load_ssn_conv_data(args.ssn_conv_ratio) + + digits = calc_sig_figs(blast_conv_data) + + # Output the data to a tab-separated file + save_merged_data(args.output, blast_conv_data, ssn_conv_data, digits) + From 32b8b918b29a28c59e9514f8c9a75300f233660f Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 15:08:25 -0500 Subject: [PATCH 09/16] Format multi-line Nextflow process script lines for consistency --- pipelines/shared/nextflow/color_workflow.nf | 37 +++++++++++++++------ 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/pipelines/shared/nextflow/color_workflow.nf b/pipelines/shared/nextflow/color_workflow.nf index 3a4fe545..49ea9d3a 100644 --- a/pipelines/shared/nextflow/color_workflow.nf +++ b/pipelines/shared/nextflow/color_workflow.nf @@ -120,9 +120,14 @@ process get_annotated_mapping_tables { script: """ - perl $projectDir/../shared/perl/annotate_mapping_table.pl --seqid-source-map $seqid_source_map --cluster-map $cluster_id_map \ - --cluster-color-map $cluster_color_map --mapping-table mapping_table.txt --swissprot-table swissprot_clusters_desc.txt \ - --config ${params.efi_config} --db-name ${params.efi_db} + perl $projectDir/../shared/perl/annotate_mapping_table.pl \ + --seqid-source-map $seqid_source_map \ + --cluster-map $cluster_id_map \ + --cluster-color-map $cluster_color_map \ + --mapping-table mapping_table.txt \ + --swissprot-table swissprot_clusters_desc.txt \ + --config ${params.efi_config} \ + --db-name ${params.efi_db} """ } @@ -141,8 +146,12 @@ process get_conv_ratio_table { script: """ - perl $projectDir/../shared/perl/compute_conv_ratio.pl --cluster-map $cluster_id_map --index-seqid-map $index_seqid_map \ - --edgelist $edgelist --seqid-source-map $seqid_source_map --conv-ratio conv_ratio.txt + perl $projectDir/../shared/perl/compute_conv_ratio.pl \ + --cluster-map $cluster_id_map \ + --index-seqid-map $index_seqid_map \ + --edgelist $edgelist \ + --seqid-source-map $seqid_source_map \ + --conv-ratio conv_ratio.txt """ } @@ -158,8 +167,11 @@ process get_cluster_stats { script: """ - perl $projectDir/../shared/perl/compute_stats.pl --cluster-map $cluster_id_map --seqid-source-map $seqid_source_map \ - --singletons $singletons --stats color_workflow_stats.json + perl $projectDir/../shared/perl/compute_stats.pl \ + --cluster-map $cluster_id_map \ + --seqid-source-map $seqid_source_map \ + --singletons $singletons \ + --stats color_workflow_stats.json """ } @@ -178,8 +190,12 @@ process compute_clusters { script: """ - python $projectDir/../shared/python/compute_clusters.py --edgelist $edgelist --index-seqid-map $index_seqid_map \ - --clusters cluster_id_map.txt --singletons singletons.txt --cluster-num-map cluster_num_map.txt + python $projectDir/../shared/python/compute_clusters.py \ + --edgelist $edgelist \ + --index-seqid-map $index_seqid_map \ + --clusters cluster_id_map.txt \ + --singletons singletons.txt \ + --cluster-num-map cluster_num_map.txt """ } @@ -193,7 +209,8 @@ process assign_cluster_colors { script: """ - perl $projectDir/../shared/perl/assign_cluster_colors.pl --cluster-num-map ${cluster_num_map} \ + perl $projectDir/../shared/perl/assign_cluster_colors.pl \ + --cluster-num-map ${cluster_num_map} \ --cluster-color-map cluster_colors.txt """ } From 72ccb722c6cb75e127302fcdeed5da9bd82597d9 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 15:32:48 -0500 Subject: [PATCH 10/16] Refactor EST all-by-all BLAST workflow - Move all process blocks into a shared Nextflow file - Needed by convergence ratio and est pipelines --- pipelines/est/subworkflows/all_by_all.nf | 113 +------------ .../render_prereduce_sql_template.py | 0 .../axa_blast/transcode_blast.py | 0 .../{est => shared}/blastreduce/README.md | 0 .../blastreduce/render_reduce_sql_template.py | 0 .../blastreduce/transcode_fasta_lengths.py | 0 .../condense/render_restore_sql_template.py | 0 .../condense/restore_condensed_sequences.py | 0 .../condense/transcode_restored_blast.py | 0 pipelines/shared/nextflow/blast.nf | 148 ++++++++++++++++++ .../templates/prereduce-template.sql | 0 .../templates/reduce-template.sql | 0 .../templates/restore-template.sql | 0 13 files changed, 149 insertions(+), 112 deletions(-) rename pipelines/{est => shared}/axa_blast/render_prereduce_sql_template.py (100%) rename pipelines/{est => shared}/axa_blast/transcode_blast.py (100%) rename pipelines/{est => shared}/blastreduce/README.md (100%) rename pipelines/{est => shared}/blastreduce/render_reduce_sql_template.py (100%) rename pipelines/{est => shared}/blastreduce/transcode_fasta_lengths.py (100%) rename pipelines/{est => shared}/condense/render_restore_sql_template.py (100%) rename pipelines/{est => shared}/condense/restore_condensed_sequences.py (100%) rename pipelines/{est => shared}/condense/transcode_restored_blast.py (100%) create mode 100644 pipelines/shared/nextflow/blast.nf rename pipelines/{est => shared}/templates/prereduce-template.sql (100%) rename pipelines/{est => shared}/templates/reduce-template.sql (100%) rename pipelines/{est => shared}/templates/restore-template.sql (100%) diff --git a/pipelines/est/subworkflows/all_by_all.nf b/pipelines/est/subworkflows/all_by_all.nf index 88c3b825..7493a2a6 100644 --- a/pipelines/est/subworkflows/all_by_all.nf +++ b/pipelines/est/subworkflows/all_by_all.nf @@ -1,116 +1,5 @@ -process create_blast_db { - input: - tuple val(fid), path(fasta_file) - - output: - tuple val(fid), path("database.*"), val("database") - - script: - """ - formatdb -i $fasta_file -n database -p T -o T - """ -} - -process all_by_all_blast { - input: - tuple val(fid), path(blast_db_files, arity: 5), val(blast_db_name), path(frac) - - output: - tuple val(fid), path("${frac}.tab.sorted.parquet") - - script: - """ - # run blast to get similarity metrics - blastall -p blastp -i $frac -d $blast_db_name -m 8 -e ${params.blast_evalue} -b ${params.blast_num_matches} -o ${frac}.tab - - # transcode to parquet for speed, creates frac.tab.parquet - python $projectDir/axa_blast/transcode_blast.py --blast-output ${frac}.tab - - # in each row, ensure that qseqid < sseqid lexicographically - DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) - python $projectDir/axa_blast/render_prereduce_sql_template.py --blast-output ${frac}.tab.parquet --sql-template $projectDir/templates/prereduce-template.sql --output-file ${frac}.tab.sorted.parquet --duckdb-memory-limit ${params.duckdb_memory_limit} --duckdb-temp-dir \${DUCKDB_TEMP} --sql-output-file prereduce.sql - duckdb < prereduce.sql - """ -} - -process blastreduce_transcode_fasta { - input: - tuple val(fid), path(fasta_file) - - output: - tuple val(fid), path("${fasta_file.getName()}.parquet") - - script: - """ - python $projectDir/blastreduce/transcode_fasta_lengths.py --fasta $fasta_file --output ${fasta_file.getName()}.parquet - """ -} - -process split_fasta { - input: - tuple val(fid), path(fasta_file) - - output: - tuple val(fid), path("fracfile-*.fa") - - script: - """ - perl $projectDir/split_fasta/split_fasta.pl -parts ${params.num_fasta_shards} -source ${fasta_file} - """ -} - -process blastreduce { - publishDir params.final_output_dir, mode: 'copy', enabled: !params.multiplex - - input: - tuple val(fid), path(blast_files), path(fasta_length_parquet) - - output: - tuple val(fid), path("1.out.parquet") - - script: - """ - DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) - python $projectDir/blastreduce/render_reduce_sql_template.py --blast-output $blast_files --sql-template $projectDir/templates/reduce-template.sql --fasta-length-parquet $fasta_length_parquet --duckdb-memory-limit ${params.duckdb_memory_limit} --duckdb-temp-dir \${DUCKDB_TEMP} --sql-output-file allreduce.sql - duckdb < allreduce.sql - """ -} - -// Formerly known as multiplex -process condense_redundant { - input: - tuple val(fid), path(fasta_file) - - output: - tuple val(fid), path("sequences.fasta"), emit: "fasta_file" - tuple val(fid), path("sequences.fasta.clstr"), emit: "condensed" - - script: - """ - cd-hit -d 0 -c 1 -s 1 -i ${fasta_file} -o sequences.fasta -M "${params.cdhit_memory_limit}" - """ -} - -// Formerly known as demultiplex -process restore_condensed { - publishDir params.final_output_dir, mode: 'copy', overwrite: true - - input: - tuple val(fid), path(blast_parquet, stageAs: "reduced.parquet"), path(condensed) - - output: - tuple val(fid), path("1.out.parquet") - - script: - """ - DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) - python $projectDir/condense/render_restore_sql_template.py --blast-parquet $blast_parquet --sql-template $projectDir/templates/restore-template.sql --duckdb-memory-limit ${params.duckdb_memory_limit} --duckdb-temp-dir \${DUCKDB_TEMP} --sql-output-file restore.sql - duckdb < restore.sql - python $projectDir/condense/restore_condensed_sequences.py --condensed-blast condensed.out --restored-blast 1.out --cd-hit-cluster ${condensed} - python $projectDir/condense/transcode_restored_blast.py --blast-output 1.out - """ -} +include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../shared/nextflow/blast.nf" workflow ALL_BY_ALL { take: diff --git a/pipelines/est/axa_blast/render_prereduce_sql_template.py b/pipelines/shared/axa_blast/render_prereduce_sql_template.py similarity index 100% rename from pipelines/est/axa_blast/render_prereduce_sql_template.py rename to pipelines/shared/axa_blast/render_prereduce_sql_template.py diff --git a/pipelines/est/axa_blast/transcode_blast.py b/pipelines/shared/axa_blast/transcode_blast.py similarity index 100% rename from pipelines/est/axa_blast/transcode_blast.py rename to pipelines/shared/axa_blast/transcode_blast.py diff --git a/pipelines/est/blastreduce/README.md b/pipelines/shared/blastreduce/README.md similarity index 100% rename from pipelines/est/blastreduce/README.md rename to pipelines/shared/blastreduce/README.md diff --git a/pipelines/est/blastreduce/render_reduce_sql_template.py b/pipelines/shared/blastreduce/render_reduce_sql_template.py similarity index 100% rename from pipelines/est/blastreduce/render_reduce_sql_template.py rename to pipelines/shared/blastreduce/render_reduce_sql_template.py diff --git a/pipelines/est/blastreduce/transcode_fasta_lengths.py b/pipelines/shared/blastreduce/transcode_fasta_lengths.py similarity index 100% rename from pipelines/est/blastreduce/transcode_fasta_lengths.py rename to pipelines/shared/blastreduce/transcode_fasta_lengths.py diff --git a/pipelines/est/condense/render_restore_sql_template.py b/pipelines/shared/condense/render_restore_sql_template.py similarity index 100% rename from pipelines/est/condense/render_restore_sql_template.py rename to pipelines/shared/condense/render_restore_sql_template.py diff --git a/pipelines/est/condense/restore_condensed_sequences.py b/pipelines/shared/condense/restore_condensed_sequences.py similarity index 100% rename from pipelines/est/condense/restore_condensed_sequences.py rename to pipelines/shared/condense/restore_condensed_sequences.py diff --git a/pipelines/est/condense/transcode_restored_blast.py b/pipelines/shared/condense/transcode_restored_blast.py similarity index 100% rename from pipelines/est/condense/transcode_restored_blast.py rename to pipelines/shared/condense/transcode_restored_blast.py diff --git a/pipelines/shared/nextflow/blast.nf b/pipelines/shared/nextflow/blast.nf new file mode 100644 index 00000000..d78813eb --- /dev/null +++ b/pipelines/shared/nextflow/blast.nf @@ -0,0 +1,148 @@ + +process all_by_all_blast { + input: + tuple val(fid), path(blast_db_files, arity: 5), val(blast_db_name), path(frac) + + output: + tuple val(fid), path("${frac}.tab.sorted.parquet") + + script: + """ + # run blast to get similarity metrics + blastall \ + -p blastp \ + -i $frac \ + -d $blast_db_name \ + -m 8 \ + -e ${params.blast_evalue} \ + -b ${params.blast_num_matches} \ + -o ${frac}.tab + + # transcode to parquet for speed, creates frac.tab.parquet + python $projectDir/../shared/axa_blast/transcode_blast.py --blast-output ${frac}.tab + + # in each row, ensure that qseqid < sseqid lexicographically + DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) + python $projectDir/../shared/axa_blast/render_prereduce_sql_template.py \ + --blast-output ${frac}.tab.parquet \ + --sql-template $projectDir/../shared/templates/prereduce-template.sql \ + --output-file ${frac}.tab.sorted.parquet \ + --duckdb-memory-limit ${params.duckdb_memory_limit} \ + --duckdb-temp-dir \${DUCKDB_TEMP} \ + --sql-output-file prereduce.sql + duckdb < prereduce.sql + """ +} + +process blastreduce { + publishDir params.final_output_dir, mode: 'copy', enabled: !params.multiplex + + input: + tuple val(fid), path(blast_files), path(fasta_length_parquet) + + output: + tuple val(fid), path("1.out.parquet") + + script: + """ + DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) + python $projectDir/../shared/blastreduce/render_reduce_sql_template.py \ + --blast-output $blast_files \ + --sql-template $projectDir/../shared/templates/reduce-template.sql \ + --fasta-length-parquet $fasta_length_parquet \ + --duckdb-memory-limit ${params.duckdb_memory_limit} \ + --duckdb-temp-dir \${DUCKDB_TEMP} \ + --sql-output-file allreduce.sql + duckdb < allreduce.sql + """ +} + +process blastreduce_transcode_fasta { + input: + tuple val(fid), path(fasta_file) + + output: + tuple val(fid), path("${fasta_file.getName()}.parquet") + + script: + """ + python $projectDir/../shared/blastreduce/transcode_fasta_lengths.py \ + --fasta $fasta_file \ + --output ${fasta_file.getName()}.parquet + """ +} + +// Formerly known as multiplex +process condense_redundant { + input: + tuple val(fid), path(fasta_file) + + output: + tuple val(fid), path("sequences.fasta"), emit: "fasta_file" + tuple val(fid), path("sequences.fasta.clstr"), emit: "condensed" + + script: + """ + cd-hit -d 0 -c 1 -s 1 -i ${fasta_file} -o sequences.fasta -M "${params.cdhit_memory_limit}" + """ +} + +process create_blast_db { + input: + tuple val(fid), path(fasta_file) + + output: + tuple val(fid), path("database.*"), val("database") + + script: + """ + formatdb \ + -i $fasta_file \ + -n database \ + -p T -o T + """ +} + +// Formerly known as demultiplex +process restore_condensed { + publishDir params.final_output_dir, mode: 'copy', overwrite: true + + input: + tuple val(fid), path(blast_parquet, stageAs: "reduced.parquet"), path(condensed) + + output: + tuple val(fid), path("1.out.parquet") + + script: + """ + DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) + python $projectDir/../shared/condense/render_restore_sql_template.py \ + --blast-parquet $blast_parquet \ + --sql-template $projectDir/../shared/templates/restore-template.sql \ + --duckdb-memory-limit ${params.duckdb_memory_limit} \ + --duckdb-temp-dir \${DUCKDB_TEMP} \ + --sql-output-file restore.sql + duckdb < restore.sql + python $projectDir/../shared/condense/restore_condensed_sequences.py \ + --condensed-blast condensed.out \ + --restored-blast 1.out \ + --cd-hit-cluster ${condensed} + python $projectDir/../shared/condense/transcode_restored_blast.py \ + --blast-output 1.out + """ +} + +process split_fasta { + input: + tuple val(fid), path(fasta_file) + + output: + tuple val(fid), path("parts/*.fasta") + + script: + """ + mkdir parts + seqkit split2 ${fasta_file} -p ${params.num_fasta_shards} --out-dir parts + """ +} + diff --git a/pipelines/est/templates/prereduce-template.sql b/pipelines/shared/templates/prereduce-template.sql similarity index 100% rename from pipelines/est/templates/prereduce-template.sql rename to pipelines/shared/templates/prereduce-template.sql diff --git a/pipelines/est/templates/reduce-template.sql b/pipelines/shared/templates/reduce-template.sql similarity index 100% rename from pipelines/est/templates/reduce-template.sql rename to pipelines/shared/templates/reduce-template.sql diff --git a/pipelines/est/templates/restore-template.sql b/pipelines/shared/templates/restore-template.sql similarity index 100% rename from pipelines/est/templates/restore-template.sql rename to pipelines/shared/templates/restore-template.sql From 8390562c64fdb4f299aafcd6664e6900876de35c Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 16:27:58 -0500 Subject: [PATCH 11/16] Refactor convergence ratio utility from est to separate pipeline --- bin/create_conv_ratio_nextflow_params.py | 2 +- .../convergenceratio.nf | 4 +- .../statistics/merge_conv_ratios.py | 0 pipelines/est/subworkflows/all_by_all.nf | 2 +- pipelines/est/subworkflows/reporting.nf | 41 ++++++++++++++++--- .../{est => shared}/statistics/conv_ratio.py | 0 6 files changed, 39 insertions(+), 10 deletions(-) rename pipelines/{est => convergenceratio}/convergenceratio.nf (98%) rename pipelines/{est => convergenceratio}/statistics/merge_conv_ratios.py (100%) rename pipelines/{est => shared}/statistics/conv_ratio.py (100%) diff --git a/bin/create_conv_ratio_nextflow_params.py b/bin/create_conv_ratio_nextflow_params.py index 3996588c..914cdf08 100755 --- a/bin/create_conv_ratio_nextflow_params.py +++ b/bin/create_conv_ratio_nextflow_params.py @@ -7,7 +7,7 @@ import shared_args -NXF_SCRIPT = "pipelines/est/convergenceratio.nf" +NXF_SCRIPT = "pipelines/convergenceratio/convergenceratio.nf" def add_args(parser: argparse.ArgumentParser): """ diff --git a/pipelines/est/convergenceratio.nf b/pipelines/convergenceratio/convergenceratio.nf similarity index 98% rename from pipelines/est/convergenceratio.nf rename to pipelines/convergenceratio/convergenceratio.nf index 5300e3af..def71a40 100644 --- a/pipelines/est/convergenceratio.nf +++ b/pipelines/convergenceratio/convergenceratio.nf @@ -1,5 +1,5 @@ -include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../est/subworkflows/all_by_all.nf" +include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../shared/nextflow/blast.nf" include { unzip_ssn } from "../shared/nextflow/util.nf" include { compute_clusters; get_conv_ratio_table; get_id_list; get_ssn_id_info } from "../shared/nextflow/color_workflow.nf" @@ -12,7 +12,7 @@ process compute_blast_conv_ratio { script: """ - python $projectDir/statistics/conv_ratio.py \ + python $projectDir/../shared/statistics/conv_ratio.py \ --blast-output ${blast_parquet} \ --fasta ${fasta_file} \ --output "${cluster_id}_conv_ratio.json" diff --git a/pipelines/est/statistics/merge_conv_ratios.py b/pipelines/convergenceratio/statistics/merge_conv_ratios.py similarity index 100% rename from pipelines/est/statistics/merge_conv_ratios.py rename to pipelines/convergenceratio/statistics/merge_conv_ratios.py diff --git a/pipelines/est/subworkflows/all_by_all.nf b/pipelines/est/subworkflows/all_by_all.nf index 7493a2a6..30a70f63 100644 --- a/pipelines/est/subworkflows/all_by_all.nf +++ b/pipelines/est/subworkflows/all_by_all.nf @@ -1,5 +1,5 @@ -include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../shared/nextflow/blast.nf" +include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../../shared/nextflow/blast.nf" workflow ALL_BY_ALL { take: diff --git a/pipelines/est/subworkflows/reporting.nf b/pipelines/est/subworkflows/reporting.nf index 876265ab..08474775 100644 --- a/pipelines/est/subworkflows/reporting.nf +++ b/pipelines/est/subworkflows/reporting.nf @@ -13,11 +13,21 @@ process compute_stats { path "conv_ratio.json", emit: stats """ # compute convergence ratio - python $projectDir/statistics/conv_ratio.py --blast-output $blast_parquet --fasta $fasta_file --output conv_ratio.json + python $projectDir/../shared/statistics/conv_ratio.py \ + --blast-output $blast_parquet \ + --fasta $fasta_file \ + --output conv_ratio.json # compute boxplot stats and evalue.tab DUCKDB_TEMP="${params.duckdb_temp_dir}/duckdb-${task.index}-"\$(date +%s) - python $projectDir/statistics/render_boxplotstats_sql_template.py --blast-output $blast_parquet --duckdb-memory-limit ${params.duckdb_memory_limit} --duckdb-temp-dir \${DUCKDB_TEMP} --boxplot-stats-output boxplot_stats.parquet --evalue-output evalue.tab --sql-template $projectDir/templates/boxplotstats-template.sql --sql-output-file boxplotstats.sql + python $projectDir/statistics/render_boxplotstats_sql_template.py \ + --blast-output $blast_parquet \ + --duckdb-memory-limit ${params.duckdb_memory_limit} \ + --duckdb-temp-dir \${DUCKDB_TEMP} \ + --boxplot-stats-output boxplot_stats.parquet \ + --evalue-output evalue.tab \ + --sql-template $projectDir/templates/boxplotstats-template.sql \ + --sql-output-file boxplotstats.sql duckdb < boxplotstats.sql """ } @@ -30,8 +40,18 @@ process visualize_boxplot_stats { path '*.json', emit: json path '*.png', emit: plots """ - python $projectDir/visualization/plot_blast_results.py --boxplot-stats $boxplot_stats --job-id ${params.job_id} --length-plot-filename alignment_length --pident-plot-filename percent_identity --edge-hist-filename number_of_edges --proxies sm:48 - python $projectDir/visualization/export_blast_results_plot_json.py --boxplot-stats $boxplot_stats --length-json-filename alignment_length.json --pident-json-filename percent_identity.json --edge-hist-json-filename number_of_edges.json + python $projectDir/visualization/plot_blast_results.py \ + --boxplot-stats $boxplot_stats \ + --job-id ${params.job_id} \ + --length-plot-filename alignment_length \ + --pident-plot-filename percent_identity \ + --edge-hist-filename number_of_edges \ + --proxies sm:48 + python $projectDir/visualization/export_blast_results_plot_json.py \ + --boxplot-stats $boxplot_stats \ + --length-json-filename alignment_length.json \ + --pident-json-filename percent_identity.json \ + --edge-hist-json-filename number_of_edges.json """ } @@ -44,8 +64,17 @@ process visualize_length_histograms { path '*.png', emit: plots """ base_name=\$(basename "${length_histogram_file}" .histogram.txt) - python $projectDir/../shared/python/plot_length_data.py --lengths $length_histogram_file --job-id ${params.job_id} --frac 1 --plot-filename length_histogram_\${base_name} --output-type png --proxies sm:48 - python $projectDir/visualization/export_length_histogram_json.py --lengths $length_histogram_file --frac 1 --output-json-filename length_histogram_\${base_name}.json + python $projectDir/../shared/python/plot_length_data.py \ + --lengths $length_histogram_file \ + --job-id ${params.job_id} \ + --frac 1 \ + --plot-filename length_histogram_\${base_name} \ + --output-type png \ + --proxies sm:48 + python $projectDir/visualization/export_length_histogram_json.py \ + --lengths $length_histogram_file \ + --frac 1 \ + --output-json-filename length_histogram_\${base_name}.json """ } diff --git a/pipelines/est/statistics/conv_ratio.py b/pipelines/shared/statistics/conv_ratio.py similarity index 100% rename from pipelines/est/statistics/conv_ratio.py rename to pipelines/shared/statistics/conv_ratio.py From c73d9fa15653c233589046172e2cbf130f954796 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 16:50:13 -0500 Subject: [PATCH 12/16] Restructure EST import utilities for more idomatic sharing between workflows - Fix path import issues - Move get_sequence_ids and get_sequences from pipelines/shared/perl into pipelines/shared/import to better reflect organization --- conf/docker.config | 2 +- conf/singularity.config | 2 +- .../convergenceratio/convergenceratio.nf | 2 +- pipelines/est/import/math.pl | 39 --------- pipelines/est/subworkflows/import.nf | 80 +------------------ pipelines/gnd/gnd.nf | 6 +- .../{est => shared}/import/filter_ids.pl | 0 .../{perl => import}/get_sequence_ids.pl | 0 .../shared/{perl => import}/get_sequences.pl | 0 .../import/get_sunburst_data.pl | 0 pipelines/shared/nextflow/color_workflow.nf | 2 +- pipelines/shared/nextflow/sequence.nf | 78 +++++++++++++++++- pipelines/taxonomy/taxonomy.nf | 4 +- 13 files changed, 88 insertions(+), 127 deletions(-) delete mode 100644 pipelines/est/import/math.pl rename pipelines/{est => shared}/import/filter_ids.pl (100%) rename pipelines/shared/{perl => import}/get_sequence_ids.pl (100%) rename pipelines/shared/{perl => import}/get_sequences.pl (100%) rename pipelines/{est => shared}/import/get_sunburst_data.pl (100%) diff --git a/conf/docker.config b/conf/docker.config index 9a51880a..336c7174 100644 --- a/conf/docker.config +++ b/conf/docker.config @@ -10,7 +10,7 @@ process { cpus = 1 memory = 5.GB container = 'enzymefunctioninitiative/efi-est:latest' - containerOptions = "-v $projectDir:$projectDir -v $projectDir/../shared:$projectDir/../shared -v $projectDir/../est:$projectDir/../est -v $projectDir/../../lib:$projectDir/../../lib -v $EFI_DATA_DIR:$EFI_DATA_DIR" + containerOptions = "-v $projectDir:$projectDir -v $projectDir/../shared:$projectDir/../shared -v $projectDir/../../lib:$projectDir/../../lib -v $EFI_DATA_DIR:$EFI_DATA_DIR" withName: blastreduce { memory = 20.GB diff --git a/conf/singularity.config b/conf/singularity.config index c8065b9f..e91dff8c 100644 --- a/conf/singularity.config +++ b/conf/singularity.config @@ -10,7 +10,7 @@ process { cpus = 1 memory = 5.GB container = 'enzymefunctioninitiative/efi-est:latest' - containerOptions = "--bind $projectDir:$projectDir --bind $projectDir/../shared:$projectDir/../shared --bind $projectDir/../est:$projectDir/../est --bind $projectDir/../../:$projectDir/../../ --bind $projectDir/../../lib:$projectDir/../../lib --bind $EFI_DATA_DIR:$EFI_DATA_DIR" + containerOptions = "--bind $projectDir:$projectDir --bind $projectDir/../shared:$projectDir/../shared --bind $projectDir/../../:$projectDir/../../ --bind $projectDir/../../lib:$projectDir/../../lib --bind $EFI_DATA_DIR:$EFI_DATA_DIR" withName: 'color_ssn|create_gnd|create_gnns|color_gnt_ssn|compute_clusters' { memory = 10.GB diff --git a/pipelines/convergenceratio/convergenceratio.nf b/pipelines/convergenceratio/convergenceratio.nf index def71a40..509ab5f9 100644 --- a/pipelines/convergenceratio/convergenceratio.nf +++ b/pipelines/convergenceratio/convergenceratio.nf @@ -47,7 +47,7 @@ process get_fasta_files { script: """ - perl $projectDir/../shared/perl/get_sequences.pl \ + perl $projectDir/../shared/import/get_sequences.pl \ --fasta-db ${params.fasta_db} \ --sequence-ids-file ${id_file} \ --output-sequence-file "${id_file.baseName}.fasta" diff --git a/pipelines/est/import/math.pl b/pipelines/est/import/math.pl deleted file mode 100644 index a5fe619e..00000000 --- a/pipelines/est/import/math.pl +++ /dev/null @@ -1,39 +0,0 @@ -=pod - -=head3 add - -=over 4 - -=item Summary - -Add two numbers and return the result - -=item Parameters - -=over 4 - -=item C (I) - -the first number - -=item C (I) - -the second number - -=back - -=item Returns - -The sum of C and C - -=back -=cut -sub add { - $a, $b = @_; - $c = $a + $b; - return $c; -} - -if ($ARGV[0] eq "--help") { - print "usage text\n"; -} \ No newline at end of file diff --git a/pipelines/est/subworkflows/import.nf b/pipelines/est/subworkflows/import.nf index cd26f2e0..492ca03a 100644 --- a/pipelines/est/subworkflows/import.nf +++ b/pipelines/est/subworkflows/import.nf @@ -1,81 +1,5 @@ -include { get_sequences } from "../../shared/nextflow/sequence.nf" - -process get_source_ids { - publishDir params.final_output_dir, mode: 'copy' - output: - path 'source_ids.tab', emit: 'source_ids' - path 'source_seq.tab', emit: 'source_meta' - path 'source_stats.json', emit: 'source_stats' - path 'blast_hits.tab', optional: true - path 'seq_mapping.tab', emit: 'seq_mapping', optional: true - path 'unmatched_id.tab', optional: true - script: - - common_args = "--efi-config ${params.efi_config} --efi-db ${params.efi_db} --mode ${params.import_mode} --sequence-version ${params.sequence_version}" - - family_args = "" - if (params.families) { - family_args = "--family " + params.families - } - - if (params.domain) { - family_args = family_args + " --domain " + params.domain_region - if (params.domain_family) { - family_args = family_args + " --domain-family " + params.domain_family - } - } - - if (params.import_mode == "blast") { - // blast_hits.tab is provided as an output to the user - """ - blastall -p blastp -i ${params.input_file} -d ${params.import_blast_fasta_db} -m 8 -e ${params.import_blast_evalue} -b ${params.import_blast_num_matches} -o init_blast.out - if [[ -s init_blast.out ]]; then - awk '! /^#/ {print \$2"\t"\$11}' init_blast.out | sort -k2nr > blast_hits.tab - else - echo "BLAST did not return any matches. Verify that the sequence is a protein and not a nucleotide sequence." - exit 1 - fi - perl $projectDir/../shared/perl/get_sequence_ids.pl $common_args $family_args --blast-output init_blast.out --blast-query ${params.input_file} - """ - } else if (params.import_mode == "accessions") { - """ - perl $projectDir/../shared/perl/get_sequence_ids.pl $common_args $family_args --accessions ${params.input_file} - """ - } else if (params.import_mode == "fasta") { - """ - perl $projectDir/../shared/perl/get_sequence_ids.pl $common_args $family_args --fasta ${params.input_file} --sequence-mapping-file seq_mapping.tab - """ - } else if (params.import_mode == "family") { - """ - perl $projectDir/../shared/perl/get_sequence_ids.pl $common_args $family_args - """ - } else { - error "Mode '${params.import_mode}' not yet implemented" - } -} - -process filter_ids { - publishDir params.final_output_dir, mode: 'copy' - input: - path source_ids // table of all sequence IDs, including UniRef IDs - path source_meta // sequence metdata - path source_stats // statistics of source import process - output: - path 'accession_table.tab', emit: 'accession_table' // table of all sequence IDs, including UniRef IDs, filtered - path 'sequence_metadata.tab', emit: 'sequence_metadata' // sequence metdata in metadata format - path 'import_stats.json', emit: 'import_stats' // final statistics of source and filter import processes - path 'retrieval_ids.tab', emit: 'retrieval_ids' // list of IDs that came from the database, as opposed to user-specified FASTA files, including domain data - script: - filter_args = "" - if (params.filter) { - filter_args = params.filter.join(" --filter ") - filter_args = "--filter ${filter_args}" - } - """ - perl $projectDir/../est/import/filter_ids.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} --sequence-version ${params.sequence_version} $filter_args - """ -} +include { filter_ids; get_sequences; get_source_ids } from "../../shared/nextflow/sequence.nf" process get_sunburst_data { publishDir params.final_output_dir, mode: 'copy' @@ -86,7 +10,7 @@ process get_sunburst_data { path 'sunburst_tax.json' script: """ - perl $projectDir/../est/import/get_sunburst_data.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} + perl $projectDir/../shared/import/get_sunburst_data.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} """ } diff --git a/pipelines/gnd/gnd.nf b/pipelines/gnd/gnd.nf index 59ca897e..ea262ba6 100644 --- a/pipelines/gnd/gnd.nf +++ b/pipelines/gnd/gnd.nf @@ -54,7 +54,7 @@ process parse_blast_results_for_ids { script: """ - perl ${projectDir}/../shared/perl/get_sequence_ids.pl \ + perl $projectDir/../shared/import/get_sequence_ids.pl \ --efi-config "${params.efi_config}" \ --efi-db "${params.efi_db}" \ --sequence-version ${params.sequence_version} \ @@ -75,7 +75,7 @@ process parse_fasta_for_ids { script: """ - perl $projectDir/../shared/perl/get_sequence_ids.pl \ + perl $projectDir/../shared/import/get_sequence_ids.pl \ --efi-config "${params.efi_config}" \ --efi-db "${params.efi_db}" \ --sequence-version ${params.sequence_version} \ @@ -95,7 +95,7 @@ process parse_accession_for_ids { script: """ - perl $projectDir/../shared/perl/get_sequence_ids.pl \ + perl $projectDir/../shared/import/get_sequence_ids.pl \ --efi-config "${params.efi_config}" \ --efi-db "${params.efi_db}" \ --sequence-version ${params.sequence_version} \ diff --git a/pipelines/est/import/filter_ids.pl b/pipelines/shared/import/filter_ids.pl similarity index 100% rename from pipelines/est/import/filter_ids.pl rename to pipelines/shared/import/filter_ids.pl diff --git a/pipelines/shared/perl/get_sequence_ids.pl b/pipelines/shared/import/get_sequence_ids.pl similarity index 100% rename from pipelines/shared/perl/get_sequence_ids.pl rename to pipelines/shared/import/get_sequence_ids.pl diff --git a/pipelines/shared/perl/get_sequences.pl b/pipelines/shared/import/get_sequences.pl similarity index 100% rename from pipelines/shared/perl/get_sequences.pl rename to pipelines/shared/import/get_sequences.pl diff --git a/pipelines/est/import/get_sunburst_data.pl b/pipelines/shared/import/get_sunburst_data.pl similarity index 100% rename from pipelines/est/import/get_sunburst_data.pl rename to pipelines/shared/import/get_sunburst_data.pl diff --git a/pipelines/shared/nextflow/color_workflow.nf b/pipelines/shared/nextflow/color_workflow.nf index 49ea9d3a..16f6b02b 100644 --- a/pipelines/shared/nextflow/color_workflow.nf +++ b/pipelines/shared/nextflow/color_workflow.nf @@ -70,7 +70,7 @@ process get_fasta { """ fasta_file="${id_file.baseName}.fasta" - perl $projectDir/../shared/perl/get_sequences.pl \ + perl $projectDir/../shared/import/get_sequences.pl \ --fasta-db ${params.fasta_db} \ --sequence-ids-file ${id_file} \ ${domain_map_arg} \ diff --git a/pipelines/shared/nextflow/sequence.nf b/pipelines/shared/nextflow/sequence.nf index 39604447..d937a2e9 100644 --- a/pipelines/shared/nextflow/sequence.nf +++ b/pipelines/shared/nextflow/sequence.nf @@ -1,4 +1,80 @@ +process filter_ids { + publishDir params.final_output_dir, mode: 'copy' + input: + path source_ids // table of all sequence IDs, including UniRef IDs + path source_meta // sequence metdata + path source_stats // statistics of source import process + output: + path 'accession_table.tab', emit: 'accession_table' // table of all sequence IDs, including UniRef IDs, filtered + path 'sequence_metadata.tab', emit: 'sequence_metadata' // sequence metdata in metadata format + path 'import_stats.json', emit: 'import_stats' // final statistics of source and filter import processes + path 'retrieval_ids.tab', emit: 'retrieval_ids' // list of IDs that came from the database, as opposed to user-specified FASTA files, including domain data + script: + filter_args = "" + if (params.filter) { + filter_args = params.filter.join(" --filter ") + filter_args = "--filter ${filter_args}" + } + """ + perl $projectDir/../shared/import/filter_ids.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} --sequence-version ${params.sequence_version} $filter_args + """ +} + +process get_source_ids { + publishDir params.final_output_dir, mode: 'copy' + output: + path 'source_ids.tab', emit: 'source_ids' + path 'source_seq.tab', emit: 'source_meta' + path 'source_stats.json', emit: 'source_stats' + path 'blast_hits.tab', optional: true + path 'seq_mapping.tab', emit: 'seq_mapping', optional: true + path 'unmatched_id.tab', optional: true + script: + + common_args = "--efi-config ${params.efi_config} --efi-db ${params.efi_db} --mode ${params.import_mode} --sequence-version ${params.sequence_version}" + + family_args = "" + if (params.families) { + family_args = "--family " + params.families + } + + if (params.domain) { + family_args = family_args + " --domain " + params.domain_region + if (params.domain_family) { + family_args = family_args + " --domain-family " + params.domain_family + } + } + + if (params.import_mode == "blast") { + // blast_hits.tab is provided as an output to the user + """ + blastall -p blastp -i ${params.input_file} -d ${params.import_blast_fasta_db} -m 8 -e ${params.import_blast_evalue} -b ${params.import_blast_num_matches} -o init_blast.out + if [[ -s init_blast.out ]]; then + awk '! /^#/ {print \$2"\t"\$11}' init_blast.out | sort -k2nr > blast_hits.tab + else + echo "BLAST did not return any matches. Verify that the sequence is a protein and not a nucleotide sequence." + exit 1 + fi + perl $projectDir/../shared/import/get_sequence_ids.pl $common_args $family_args --blast-output init_blast.out --blast-query ${params.input_file} + """ + } else if (params.import_mode == "accessions") { + """ + perl $projectDir/../shared/import/get_sequence_ids.pl $common_args $family_args --accessions ${params.input_file} + """ + } else if (params.import_mode == "fasta") { + """ + perl $projectDir/../shared/import/get_sequence_ids.pl $common_args $family_args --fasta ${params.input_file} --sequence-mapping-file seq_mapping.tab + """ + } else if (params.import_mode == "family") { + """ + perl $projectDir/../shared/import/get_sequence_ids.pl $common_args $family_args + """ + } else { + error "Mode '${params.import_mode}' not yet implemented" + } +} + process get_sequences { input: path accession_ids @@ -7,7 +83,7 @@ process get_sequences { path "${accession_ids}.fasta" """ if [[ -s "${accession_ids}" ]]; then - perl $projectDir/../shared/perl/get_sequences.pl --fasta-db ${fasta_db} --sequence-ids-file ${accession_ids} --output-sequence-file ${accession_ids}.fasta + perl $projectDir/../shared/import/get_sequences.pl --fasta-db ${fasta_db} --sequence-ids-file ${accession_ids} --output-sequence-file ${accession_ids}.fasta else touch ${accession_ids}.fasta fi diff --git a/pipelines/taxonomy/taxonomy.nf b/pipelines/taxonomy/taxonomy.nf index afd1c8bb..7465f4b2 100644 --- a/pipelines/taxonomy/taxonomy.nf +++ b/pipelines/taxonomy/taxonomy.nf @@ -1,5 +1,5 @@ -include { get_source_ids; filter_ids } from "../est/subworkflows/import.nf" +include { filter_ids; get_source_ids } from "../shared/nextflow/sequence.nf" process get_sunburst_data { publishDir params.final_output_dir, mode: 'copy' @@ -11,7 +11,7 @@ process get_sunburst_data { path 'sunburst_stats.json', emit: 'stats_file' script: """ - perl $projectDir/../est/import/get_sunburst_data.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} --sunburst-stats-file sunburst_stats.json + perl $projectDir/../shared/import/get_sunburst_data.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} --sunburst-stats-file sunburst_stats.json """ } From 93dbfeef7178ea36e8ce8ce38f93bb07c4d4d979 Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Wed, 8 Apr 2026 16:52:33 -0500 Subject: [PATCH 13/16] Add test for convergence ratio --- tests/modules/18a_conv_ratio.sh | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 tests/modules/18a_conv_ratio.sh diff --git a/tests/modules/18a_conv_ratio.sh b/tests/modules/18a_conv_ratio.sh new file mode 100644 index 00000000..97851d17 --- /dev/null +++ b/tests/modules/18a_conv_ratio.sh @@ -0,0 +1,21 @@ +#!/bin/bash +set -e + +if [[ ! -e "$EFI_TEST_SSN_UNIPROT" ]]; then + echo "Test skipped; missing $EFI_TEST_SSN_UNIPROT" + exit 0 +fi + +TEST_RESULTS_DIR=$1 +CONFIG_FILE=$2 + +self=$(basename "$0" .sh) +OUTPUT_DIR="$TEST_RESULTS_DIR/$self" + +NB_SIZE=10 + +rm -rf $OUTPUT_DIR + +./bin/create_conv_ratio_nextflow_params.py --output-dir $OUTPUT_DIR --colored-ssn-input /home/noberg/dev/EST/results/05a_colorssn_uniprot/color_ssn.xgmml.zip --efi-config $EFI_CONFIG_FILE --efi-db $EFI_DB_NAME --fasta-db $EFI_FASTA_DB --nextflow-config $CONFIG_FILE --ascore 5 +bash $OUTPUT_DIR/run_nextflow.sh + From 5dce647cbcf85bbb57ffb743811372375798f52e Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Fri, 10 Apr 2026 12:39:32 -0500 Subject: [PATCH 14/16] Convert input file variable to a value channel --- pipelines/convergenceratio/convergenceratio.nf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelines/convergenceratio/convergenceratio.nf b/pipelines/convergenceratio/convergenceratio.nf index 509ab5f9..446d0409 100644 --- a/pipelines/convergenceratio/convergenceratio.nf +++ b/pipelines/convergenceratio/convergenceratio.nf @@ -79,9 +79,9 @@ process get_selected_id_lists { workflow { if (params.ssn_input =~ /\.zip$/) { - ssn_file = unzip_ssn(params.ssn_input) + ssn_file = unzip_ssn(Channel.value(file(params.ssn_input))) } else { - ssn_file = params.ssn_input + ssn_file = Channel.value(file(params.ssn_input)) } // @@ -111,7 +111,7 @@ workflow { .filter { file -> !file.name.contains("singleton") } // Don"t include singletons in the analysis .map { file -> // Transform the file name into a cluster ID. Removes "cluster_UniProt_" (or - // "cluster_UniRefXX_") from the front if present + // "cluster_UniRefXX_") from the front if present, leaving "Cluster_#" def clean_id = file.simpleName.replaceAll(/^cluster_Uni(Prot|Ref90|Ref50)_/, "") return tuple(clean_id, file) } From 2fb86eb0922123443950fd8f14f12a2320ceb02a Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Fri, 10 Apr 2026 13:46:16 -0500 Subject: [PATCH 15/16] Rename command line argument for consistency --- bin/create_conv_ratio_nextflow_params.py | 12 ++++++------ pipelines/est/est.nf | 4 ++-- pipelines/shared/nextflow/blast.nf | 11 +++++++++-- pipelines/taxonomy/taxonomy.nf | 10 ++++++++-- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/bin/create_conv_ratio_nextflow_params.py b/bin/create_conv_ratio_nextflow_params.py index 914cdf08..966061b4 100755 --- a/bin/create_conv_ratio_nextflow_params.py +++ b/bin/create_conv_ratio_nextflow_params.py @@ -14,7 +14,7 @@ def add_args(parser: argparse.ArgumentParser): Add arguments for Convergence Ratio pipeline to ``parser`` """ parser.add_argument("--ascore", required=False, type=int, default=5, help="The alignment score to use for the BLAST computations; this is converted to an e-value") - parser.add_argument("--colored-ssn-input", required=True, type=str, help="The SSN file to use for computing convergence ratios; must be the output of the Color SSN or Cluster Analysis pipelines") + parser.add_argument("--ssn-input", required=True, type=str, help="The SSN file to use for computing convergence ratios") parser.add_argument("--fasta-db", type=str, required=True, help="FASTA file or BLAST database to retrieve sequences from") shared_args.add_args(parser) @@ -31,8 +31,8 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: else: args = validated_args - if not os.path.exists(args.colored_ssn_input): - print(f"SSN Input file '{args.colored_ssn_input}' does not exist") + if not os.path.exists(args.ssn_input): + print(f"SSN Input file '{args.ssn_input}' does not exist") fail = True if len(glob.glob(f"{args.fasta_db}.*")) == 0: @@ -46,7 +46,7 @@ def check_args(args: argparse.Namespace) -> argparse.Namespace: print("Failed to render params template") exit(1) else: - args.colored_ssn_input = os.path.abspath(args.colored_ssn_input) + args.ssn_input = os.path.abspath(args.ssn_input) args.fasta_db = os.path.abspath(args.fasta_db) return args @@ -55,7 +55,7 @@ def create_parser() -> argparse.ArgumentParser: add_args(parser) return parser -def render_params(colored_ssn_input, ascore, efi_config, efi_db, fasta_db, output_dir, **kwargs: dict): +def render_params(ssn_input, ascore, efi_config, efi_db, fasta_db, output_dir, **kwargs: dict): evalue = f"1e-{ascore}" params = { "blast_evalue": evalue, @@ -63,7 +63,7 @@ def render_params(colored_ssn_input, ascore, efi_config, efi_db, fasta_db, outpu "efi_db": efi_db, "fasta_db": fasta_db, "final_output_dir": output_dir, - "ssn_input": colored_ssn_input, + "ssn_input": ssn_input, } # Handle kwargs dict, assuming each entry is a parameter to be added to params diff --git a/pipelines/est/est.nf b/pipelines/est/est.nf index 069788c2..89600634 100644 --- a/pipelines/est/est.nf +++ b/pipelines/est/est.nf @@ -13,8 +13,8 @@ workflow { blast_input = initial_data.fasta_file.map { fasta_file -> tuple("ALL_DATA", fasta_file) } axa_results = ALL_BY_ALL(blast_input) - blast_parquet = axa_results.blast_parquet.map { junk, file -> file } - fasta_lengths_parquet = axa_results.fasta_lengths_parquet.map { junk, file -> file } + blast_parquet = axa_results.blast_parquet.map { fid, file -> file } + fasta_lengths_parquet = axa_results.fasta_lengths_parquet.map { fid, file -> file } results = REPORTING(blast_parquet, fasta_lengths_parquet, initial_data.fasta_file, initial_data.accession_table, initial_data.import_stats) } diff --git a/pipelines/shared/nextflow/blast.nf b/pipelines/shared/nextflow/blast.nf index d78813eb..d6dcb955 100644 --- a/pipelines/shared/nextflow/blast.nf +++ b/pipelines/shared/nextflow/blast.nf @@ -83,7 +83,11 @@ process condense_redundant { script: """ - cd-hit -d 0 -c 1 -s 1 -i ${fasta_file} -o sequences.fasta -M "${params.cdhit_memory_limit}" + cd-hit \ + -d 0 -c 1 -s 1 \ + -i ${fasta_file} \ + -o sequences.fasta \ + -M "${params.cdhit_memory_limit}" """ } @@ -142,7 +146,10 @@ process split_fasta { script: """ mkdir parts - seqkit split2 ${fasta_file} -p ${params.num_fasta_shards} --out-dir parts + seqkit split2 \ + ${fasta_file} \ + -p ${params.num_fasta_shards} \ + --out-dir parts """ } diff --git a/pipelines/taxonomy/taxonomy.nf b/pipelines/taxonomy/taxonomy.nf index 7465f4b2..9c4e1cae 100644 --- a/pipelines/taxonomy/taxonomy.nf +++ b/pipelines/taxonomy/taxonomy.nf @@ -11,7 +11,10 @@ process get_sunburst_data { path 'sunburst_stats.json', emit: 'stats_file' script: """ - perl $projectDir/../shared/import/get_sunburst_data.pl --efi-config ${params.efi_config} --efi-db ${params.efi_db} --sunburst-stats-file sunburst_stats.json + perl $projectDir/../shared/import/get_sunburst_data.pl \ + --efi-config ${params.efi_config} \ + --efi-db ${params.efi_db} \ + --sunburst-stats-file sunburst_stats.json """ } @@ -24,7 +27,10 @@ process process_sunburst_stats { path 'stats.json' script: """ - python $projectDir/statistics/update_import_stats.py --stats-file ${sunburst_stats_file} ${import_stats_file} --output stats.json + python $projectDir/statistics/update_import_stats.py \ + --stats-file ${sunburst_stats_file} \ + ${import_stats_file} \ + --output stats.json """ } From c55edbc61986e0ca1f2079520cbbc84a2dc99c3e Mon Sep 17 00:00:00 2001 From: Nils Oberg Date: Mon, 13 Apr 2026 11:53:18 -0500 Subject: [PATCH 16/16] Minor changes to merge_conv_ratio to better handle exceptions --- .../statistics/merge_conv_ratios.py | 29 ++++++++++--------- tests/modules/18a_conv_ratio.sh | 4 +-- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/pipelines/convergenceratio/statistics/merge_conv_ratios.py b/pipelines/convergenceratio/statistics/merge_conv_ratios.py index d62f47cd..1a01041e 100644 --- a/pipelines/convergenceratio/statistics/merge_conv_ratios.py +++ b/pipelines/convergenceratio/statistics/merge_conv_ratios.py @@ -40,31 +40,35 @@ def load_blast_conv_files(input_files: List) -> Dict[str, Dict[str, str]]: filename = os.path.basename(json_file) # Extract the cluster number - match = re.search(r'[cC]luster_(\d+)_conv_ratio\.json', filename) + match = re.search(r'Cluster_(\d+)_conv_ratio\.json', filename) if match: cluster_id = match.group(1) else: - cluster_id = filename.replace('_conv_ratio.json', '') + # Terminate because something bad happened, if the files aren't in the specified format + raise Exception(f"Invalid filename format '{filename}'") + conv_ratio = None + edges = None + nodes = None try: with open(json_file, 'r') as fh: data = json.load(fh) - conv_ratio = data.get('convergence_ratio', 'NA') - edges = data.get('num_blast_edges', 'NA') - nodes = data.get('num_unique_ids', 'NA') + conv_ratio = data.get('convergence_ratio', None) + edges = data.get('num_blast_edges', None) + nodes = data.get('num_unique_ids', None) - if conv_ratio != 'NA': + if conv_ratio != None: conv_ratio = float(conv_ratio) - - blast_data[cluster_id] = { - 'ratio': conv_ratio, - 'edges': edges, - 'nodes': nodes - } except Exception as e: print(f"Error processing file {json_file}: {e}", file=sys.stderr) + blast_data[cluster_id] = { + 'ratio': conv_ratio, + 'edges': edges, + 'nodes': nodes + } + return blast_data def load_ssn_conv_data(data_file: str) -> Dict[str, Dict[str, str]]: @@ -87,7 +91,6 @@ def load_ssn_conv_data(data_file: str) -> Dict[str, Dict[str, str]]: header = next(fh) for line in fh: - #Cluster Number Convergence Ratio Number of SSN Nodes Number of UniProt IDs Number of Edges cluster_num, conv_ratio, num_nodes, num_ids, num_edges = re.split(r'\t', line.strip()) ssn_data[cluster_num] = { 'ratio': conv_ratio, diff --git a/tests/modules/18a_conv_ratio.sh b/tests/modules/18a_conv_ratio.sh index 97851d17..7a8a506e 100644 --- a/tests/modules/18a_conv_ratio.sh +++ b/tests/modules/18a_conv_ratio.sh @@ -12,10 +12,8 @@ CONFIG_FILE=$2 self=$(basename "$0" .sh) OUTPUT_DIR="$TEST_RESULTS_DIR/$self" -NB_SIZE=10 - rm -rf $OUTPUT_DIR -./bin/create_conv_ratio_nextflow_params.py --output-dir $OUTPUT_DIR --colored-ssn-input /home/noberg/dev/EST/results/05a_colorssn_uniprot/color_ssn.xgmml.zip --efi-config $EFI_CONFIG_FILE --efi-db $EFI_DB_NAME --fasta-db $EFI_FASTA_DB --nextflow-config $CONFIG_FILE --ascore 5 +./bin/create_conv_ratio_nextflow_params.py --output-dir $OUTPUT_DIR --ssn-input $EFI_TEST_SSN_UNIPROT --efi-config $EFI_CONFIG_FILE --efi-db $EFI_DB_NAME --fasta-db $EFI_FASTA_DB --nextflow-config $CONFIG_FILE --ascore 5 bash $OUTPUT_DIR/run_nextflow.sh