Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions bin/create_conv_ratio_nextflow_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3

import argparse
import glob
import json
import os

import shared_args

NXF_SCRIPT = "pipelines/convergenceratio/convergenceratio.nf"

def add_args(parser: argparse.ArgumentParser):
"""
Add arguments for Convergence Ratio pipeline to ``parser``
"""
parser.add_argument("--ascore", required=False, type=int, default=5, help="The alignment score to use for the BLAST computations; this is converted to an e-value")
parser.add_argument("--ssn-input", required=True, type=str, help="The SSN file to use for computing convergence ratios")
parser.add_argument("--fasta-db", type=str, required=True, help="FASTA file or BLAST database to retrieve sequences from")
shared_args.add_args(parser)

def check_args(args: argparse.Namespace) -> argparse.Namespace:
"""
Test file paths and rewrite them to be absolute
"""
fail = False

# Check for shared args validity
validated_args = shared_args.check_args(args)
if validated_args is None:
fail = True
else:
args = validated_args

if not os.path.exists(args.ssn_input):
print(f"SSN Input file '{args.ssn_input}' does not exist")
fail = True

if len(glob.glob(f"{args.fasta_db}.*")) == 0:
print(f"FASTA database '{args.fasta_db}' not found")
fail = True

if args.workflow_def is None:
args.workflow_def = os.path.abspath(NXF_SCRIPT)

if fail:
print("Failed to render params template")
exit(1)
else:
args.ssn_input = os.path.abspath(args.ssn_input)
args.fasta_db = os.path.abspath(args.fasta_db)
return args

def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Render params.yml for GND nextflow pipeline")
add_args(parser)
return parser

def render_params(ssn_input, ascore, efi_config, efi_db, fasta_db, output_dir, **kwargs: dict):
evalue = f"1e-{ascore}"
params = {
"blast_evalue": evalue,
"efi_config": efi_config,
"efi_db": efi_db,
"fasta_db": fasta_db,
"final_output_dir": output_dir,
"ssn_input": ssn_input,
}

# Handle kwargs dict, assuming each entry is a parameter to be added to params
params.update(kwargs)

# Remove parameter keys with None values
params = {key: value for key, value in params.items() if value != None}

params_file = os.path.join(output_dir, shared_args.PARAMS_NAME)
with open(params_file, "w") as f:
json.dump(params, f, indent=4)
print(f"Wrote params to '{params_file}'")
return params_file

if __name__ == "__main__":
args = check_args(create_parser().parse_args())
params_file = render_params(**vars(args))
shared_args.save_run_script(args, workflow_def=args.workflow_def, params_file=params_file)

2 changes: 1 addition & 1 deletion conf/docker.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ process {
cpus = 1
memory = 5.GB
container = 'enzymefunctioninitiative/efi-est:latest'
containerOptions = "-v $projectDir:$projectDir -v $projectDir/../shared:$projectDir/../shared -v $projectDir/../est:$projectDir/../est -v $projectDir/../../lib:$projectDir/../../lib -v $EFI_DATA_DIR:$EFI_DATA_DIR"
containerOptions = "-v $projectDir:$projectDir -v $projectDir/../shared:$projectDir/../shared -v $projectDir/../../lib:$projectDir/../../lib -v $EFI_DATA_DIR:$EFI_DATA_DIR"

withName: blastreduce {
memory = 20.GB
Expand Down
2 changes: 1 addition & 1 deletion conf/singularity.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ process {
cpus = 1
memory = 5.GB
container = 'enzymefunctioninitiative/efi-est:latest'
containerOptions = "--bind $projectDir:$projectDir --bind $projectDir/../shared:$projectDir/../shared --bind $projectDir/../est:$projectDir/../est --bind $projectDir/../../:$projectDir/../../ --bind $projectDir/../../lib:$projectDir/../../lib --bind $EFI_DATA_DIR:$EFI_DATA_DIR"
containerOptions = "--bind $projectDir:$projectDir --bind $projectDir/../shared:$projectDir/../shared --bind $projectDir/../../:$projectDir/../../ --bind $projectDir/../../lib:$projectDir/../../lib --bind $EFI_DATA_DIR:$EFI_DATA_DIR"

withName: 'color_ssn|create_gnd|create_gnns|color_gnt_ssn|compute_clusters' {
memory = 10.GB
Expand Down
160 changes: 160 additions & 0 deletions pipelines/convergenceratio/convergenceratio.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@

include { all_by_all_blast; blastreduce; blastreduce_transcode_fasta; condense_redundant; create_blast_db; restore_condensed; split_fasta } from "../shared/nextflow/blast.nf"
include { unzip_ssn } from "../shared/nextflow/util.nf"
include { compute_clusters; get_conv_ratio_table; get_id_list; get_ssn_id_info } from "../shared/nextflow/color_workflow.nf"

process compute_blast_conv_ratio {
input:
tuple val(cluster_id), path(blast_parquet), path(fasta_file)

output:
path("${cluster_id}_conv_ratio.json")

script:
"""
python $projectDir/../shared/statistics/conv_ratio.py \
--blast-output ${blast_parquet} \
--fasta ${fasta_file} \
--output "${cluster_id}_conv_ratio.json"
"""
}

process merge_conv_ratios {
publishDir params.final_output_dir, mode: "copy", pattern: "{conv_ratio.tab}"

input:
path stats_files
path cr_table

output:
path "conv_ratio.tab"

script:
"""
python $projectDir/statistics/merge_conv_ratios.py \
--ssn-conv-ratio ${cr_table} \
--stats ${stats_files} \
--output conv_ratio.tab
"""
}

process get_fasta_files {
input:
tuple val(cluster_id), path(id_file)

output:
tuple val(cluster_id), path("*.fasta", arity: "1")

script:
"""
perl $projectDir/../shared/import/get_sequences.pl \
--fasta-db ${params.fasta_db} \
--sequence-ids-file ${id_file} \
--output-sequence-file "${id_file.baseName}.fasta"
"""
}

process get_selected_id_lists {
input:
path uniprot_dir
path uniref90_dir
path uniref50_dir

output:
path "selected_ids/*.txt", emit: "id_files"

script:
"""
mkdir -p selected_ids

if ls ${uniref50_dir}/*.txt 1> /dev/null 2>&1; then
cp ${uniref50_dir}/*.txt selected_ids/
elif ls ${uniref90_dir}/*.txt 1> /dev/null 2>&1; then
cp ${uniref90_dir}/*.txt selected_ids/
else
cp ${uniprot_dir}/*.txt selected_ids/
fi
"""
}

workflow {
if (params.ssn_input =~ /\.zip$/) {
ssn_file = unzip_ssn(Channel.value(file(params.ssn_input)))
} else {
ssn_file = Channel.value(file(params.ssn_input))
}

//
// STEP 1: GET ID LISTS AND COMPUTE CLUSTERS
//

ssn_data = get_ssn_id_info(ssn_file)

compute_info = compute_clusters(ssn_data.edgelist, ssn_data.index_seqid_map)

// Convert the sequence type to a value channel
sequence_type_val = ssn_data.sequence_type.map { it.trim() }

id_list_data = get_id_list(compute_info.cluster_id_map, compute_info.singletons, ssn_data.seqid_source_map, sequence_type_val)

// Get the ID lists that will be used, e.g. the original sequences from the SSN
input_seq_type_id_lists = get_selected_id_lists(id_list_data.uniprot_dir, id_list_data.uniref90_dir, id_list_data.uniref50_dir)

//
// STEP 2: OBTAIN THE FASTA FILES
Comment thread
nilsoberg marked this conversation as resolved.
//

// Get the cluster IDs from the file names
cluster_id_lists = input_seq_type_id_lists
.flatten()
.filter { file -> !file.name.contains("_All") } // Don"t include the file with all sequences in the analysis
.filter { file -> !file.name.contains("singleton") } // Don"t include singletons in the analysis
.map { file ->
// Transform the file name into a cluster ID. Removes "cluster_UniProt_" (or
// "cluster_UniRefXX_") from the front if present, leaving "Cluster_#"
def clean_id = file.simpleName.replaceAll(/^cluster_Uni(Prot|Ref90|Ref50)_/, "")
return tuple(clean_id, file)
}

// Get FASTA files; the return value is a channel of tuples of [clusterId, file]
cluster_fasta = get_fasta_files(cluster_id_lists)

//
// STEP 3: RUN THE EST-BASED ALL-BY-ALL WORKFLOW
// Run a workflow that is nearly identical to the ALL_BY_ALL workflow in the est pipeline.
// See the workflow for more information on how this works.
//

reduced_fasta = condense_redundant(cluster_fasta)

blast_databases = create_blast_db(reduced_fasta.fasta_file)

fasta_lengths_parquet = blastreduce_transcode_fasta(cluster_fasta)

fasta_shards = split_fasta(reduced_fasta.fasta_file)

blast_input = blast_databases.combine(fasta_shards.transpose(), by: 0)

blast_fractions = all_by_all_blast(blast_input).groupTuple()

reduced_blast_parquet = blastreduce(blast_fractions.join(fasta_lengths_parquet))

// Expand redundant sequences after BLAST computation (formerly known as demultiplex)
blast_parquet = restore_condensed(reduced_blast_parquet.join(reduced_fasta.condensed))

//
// STEP 4: COMPUTE CONVERGENCE RATIOS
//

// Compute the convergence ratio based on the edges and nodes in each cluster
cr_table = get_conv_ratio_table(ssn_data.edgelist, ssn_data.index_seqid_map, compute_info.cluster_id_map, ssn_data.seqid_source_map)

// Compute the convergence ratio based on the BLAST results
//stats_files = compute_blast_conv_ratio(blast_parquet.combine(fasta_lengths_parquet, by: 0))
//stats_files = compute_blast_conv_ratio(reduced_blast_parquet.combine(fasta_lengths_parquet, by: 0))
stats_files = compute_blast_conv_ratio(reduced_blast_parquet.join(fasta_lengths_parquet))

// Merge the data for all the clusters into one file
merge_conv_ratios(stats_files.collect(), cr_table)
}

Loading