In [1]:
import wolf
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
import re
import subprocess

### CLUMPS
#### - `GitHub repo`: https://github.com/broadinstitute/getzlab-CLUMPS2
---

#### 3 main tasks from the `CLUMPS` algorithm
- _(task 0) **localization** : localization_
- **prep** : pre-processing
- **run** : core clumps algorithm
- **post** : post-processing
---

### (0) LOCALIZATION task

In [2]:

clumpsLocalization_results = {'cancer_genes': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/cancer_genes/allCancerGenes.txt', 
                              'coverage_track': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/coverage_track/WEx_cov.fwb', 
                              'coverage_track_index': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/coverage_track_index/WEx_cov.fwi', 
                              'fasta': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/fasta/UP000005640_9606.fasta.gz', 'genome_2bit': 
                              'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/genome_2bit/hg19.2bit',  
                              'gpmaps': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/gpmaps/genomeProteomeMaps.txt', 
                              'maf': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/maf/pancan15k.v18.maf', 
                              'pdb_dir': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/pdb_dir/pdb', 
                              'prot2pdb_chunks': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks', 
                              'uniprot_map': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/uniprot_map/huniprot2pdb.run18.filt.txt', 
                              'small_maf': 'rodisk://canine-b7f9872002626d1a514fcdf5e295a8bd/small_maf/pancan100.test.maf'}

docker_image = 'gcr.io/broad-getzlab-workflows/adunford_clumps_highertimeout:latest'
run_name = "clumps_full_2hrtimeout_100kmax"

### (1) PREP task

In [3]:
class clumps_prep_task(wolf.Task):
    # the pre-processing step which prepares input for clumps ('run step')
    # Preparation for clumps input files:
    # computes mutational frequencies, spectra, and identifies protein structures needed
    resources = { "mem" : "8G" }
    
    name = "%s_pre-processing_task" % run_name
    
    # input data for the 'prep' step is the mutation annotation file (maf)
    # <Required> Input file for CLUMPS. Default expects .maf
    inputs = {
        "maf" : None,
        "genome_2bit" : None,
        "fasta" : None, 
        "gpmaps" : None
#         "maf" : localization["maf"], # downstream task inputs are outputs from the localizaton task
#         "genome_2bit" : localization["genome_2bit"],
#         "fasta" : localization["fasta"],
#         "gpmaps" : localization["gpmaps"]
    }
    
    
    ### bash script to run the `prep-step` ###
    # first look for the output directory and create it if it does not already exist
    # take the input mutation data (maf)
    # run the clumps-prep step
    script = """
    mkdir clumps_preprocess
    clumps-prep --input ${maf} --output_dir clumps_preprocess --hgfile ${genome_2bit} --fasta ${fasta} --gpmaps ${gpmaps}
    tar cf clumps_preprocess.tar clumps_preprocess
    """
    
    # place the files from the 'clumps-prep' step into the output directory
    output_patterns = {
        "prep_outdir" : "clumps_preprocess.tar"
    }
    
    # Docker images are specified as "<image>[:tag]". If [tag] is not given, it defaults to "latest"
    docker = docker_image

In [4]:
clumpsPrep = clumps_prep_task()
clumpsPrep_results = clumpsPrep.run(
    maf = clumpsLocalization_results["maf"],
    genome_2bit = clumpsLocalization_results["genome_2bit"],
    fasta = clumpsLocalization_results["fasta"],
    gpmaps = clumpsLocalization_results["gpmaps"]
)

[20220703-17:21:43] [prefect] Available disk storage at /mnt/nfs is small (195 GB remaining)
[20220703-17:21:43] [prefect] Starting Slurm controller ...
[20220703-17:21:44] [prefect] Waiting up to 60 seconds for Slurm controller to start ...
[20220703-17:21:52] [prefect] Localizing inputs...
[20220703-17:21:52] [prefect] Job staged on SLURM controller in: /mnt/nfs/workspace/clumps_full_2hrtimeout_100kmax_pre-processing_task__2022-06-16--20-16-15_ohy20dy_rgz4h0q_xurchlelmcp4c
[20220703-17:21:52] [prefect] Preparing pipeline script
[20220703-17:21:52] [prefect] 1/1 jobs avoided
[20220703-17:22:14] [prefect] Terminating all jobs ... 
[20220703-17:22:14] [prefect] done


---

In [5]:
#if we want to use chunklist_18
prot2pdb_dir = clumpsLocalization_results['prot2pdb_chunks'].replace('rodisk://','')
chunks_list_18 = subprocess.check_output("ls /mnt/nfs/ro_disks/%s/*" %prot2pdb_dir, shell = True).decode().rstrip().split("\n")
chunks_list_18

['/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00000.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00001.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00002.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00003.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00004.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00005.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/huniprot2pdb.run18_chunks/huniprot2pdb_chunk_00006.gz',
 '/mnt/nfs/ro_disks/canine-b7f9872002626d1a514fcdf5e295a8bd/prot2pdb_chunks/

---

### (2) RUN task

In [17]:
class clumps_run_task(wolf.Task):
    # a scatter task to execute multiple runs of the same script in parallel
    # we will parallelize this using ~300-400 chards
    # this task is the main clumps processing/algorithm
    resources = { "mem" : "8G" }
    
    name = "%s_clumps_run_UniformSampler" % run_name
    
    # the input files for this step are the different individual prot2pdb chunks from the huniprot2pdb_chunks folder
    # provide a list of all the individual prot2pdb chunks (or the file path to each prot2pdb chunks file)
    
    # <Required> Directory of files titled with Uniprot IDs that have mutation information
    # <Required> File mapping uniprot ID to PDB ID with residue-level mapping information.
    # coverage_track is on the gs bucket
    inputs = {
        "clumps_preprocess" : None,
        "prot2pdb_chunks" : None,
        "pdb_dir" : None,
        "coverage_track" : None,
        "coverage_track_index" : None, # not actually used as an input; just needs to be localized alongside coverage_track
        "genome_2bit" : None,
        "fasta" : None,
        "gpmaps" : None
    }
    
    overrides = { "prot2pdb_chunks" : "delayed" }
    
    ### bash script to run the `run-step` ###
    # un-tar the `clumps_preprocess.tar` file which is the output directory from the 'clumps-prep step'
    # this will create a diretory(folder) of the same name (clumps_preprocess)
    script = """    
    tar xf $clumps_preprocess
    clumps --muts clumps_preprocess/split_proteins \
        --maps ${prot2pdb_chunks} \
        --mut_freq clumps_preprocess/mut_freq.txt \
        --out_dir clumps_results \
        --sampler UniformSampler \
        --coverage_track ${coverage_track} \
        --mut_spectra clumps_preprocess/mut_spectra.txt \
        --pdb_dir ${pdb_dir} \
        --hgfile ${genome_2bit} --fasta ${fasta} --gpmaps ${gpmaps} \
        --max_rand  10000  \
        --threads 8
    
    tar cf clumps_results.tar clumps_results

    """
    output_patterns = {
        "run_outdir" : "clumps_results.tar"
    }
    
    # Docker Image
    docker = docker_image

In [None]:
# THIS IS FOR TESTING
# running the `run_task`
clumpsRun = clumps_run_task()
clumpsRun_results = clumpsRun.run(
    clumps_preprocess = clumpsPrep_results["prep_outdir"],
    prot2pdb_chunks = chunks_list_18,
    pdb_dir = clumpsLocalization_results["pdb_dir"],
    coverage_track = clumpsLocalization_results["coverage_track"],
    coverage_track_index = clumpsLocalization_results["coverage_track_index"],
    genome_2bit = clumpsLocalization_results["genome_2bit"],
    fasta = clumpsLocalization_results["fasta"],
    gpmaps = clumpsLocalization_results["gpmaps"]
)

[20220705-15:14:56] [prefect] Available disk storage at /mnt/nfs is small (199 GB remaining)
[20220705-15:14:56] [prefect] Starting Slurm controller ...
[20220705-15:14:56] [prefect] Waiting up to 60 seconds for Slurm controller to start ...
[20220705-15:15:09] [prefect] Localizing inputs...
[20220705-15:15:15] [prefect] Job staged on SLURM controller in: /mnt/nfs/workspace/clumps_full_2hrtimeout_100kmax_clumps_run_UniformSampler__2022-07-04--17-07-35_1b1m1zy_rgz4h0q_vijxlsy3mpyy4
[20220705-15:15:15] [prefect] Preparing pipeline script


### (3) POST-PROCESS task

In [18]:
class clumps_postprocess_task(wolf.Task):
    # Generates summary files from array outputs of clumps.
    # gather task which takes an array of inputs, but rather than dispatch an individual job for each input,
    # instead gather them together.
    resources = { "mem" : "8G" }
    
    name = "clumps_post-processing_task_mutspecsampler"
    
    # <Required> Results directory from CLUMPS
    # <Required> Split protein directory with mutation mapping information.
    inputs = {
        "clumps_preprocess" : None,
        "clumps_results" : None, #gather_param (paths to all input files being gathered)
        "cancer_genes" : None,
        "uniprot_map" : None,
        "pdb_dir" : None
    }
    
    ### bash script to run the `post-step` ###
    # untar the 'clumps_preprocess' to access the files
    # untar the 'clumps_results' to access the input files
    script = """
    tar xf $clumps_preprocess
    while read -r i; do
      tar xf $i
    done < $clumps_results
    clumps-postprocess --input_dir clumps_results \
      --proteins_dir clumps_preprocess/split_proteins \
      --cancer_genes ${cancer_genes} \
      --uniprot_map ${uniprot_map} \
      --pdb_dir ${pdb_dir} \
      --output_file clumps_output.tsv
    """
    
    # Output file from CLUMPS with list of genes
    output_patterns = {
        "clumps_output" : "clumps_output.tsv"
    }
    
    # Docker Image
    docker = docker_image

### (4) WORKFLOW
using all 3 pre-defined taks

In [19]:
def clumps_workflow(maf, genome_2bit, fasta, gpmaps, prot2pdb_chunks, pdb_dir, coverage_track, cancer_genes, uniprot_map):
    # (step-#0): localization task
    localization = wolf.localization.BatchLocalDisk(
    # "files" parameter is a dict mapping output name -> file path
      files = {
        "maf" : maf,
        "genome_2bit" : genome_2bit,
        "fasta" : fasta,
        "gpmaps" : gpmaps,
        "prot2pdb_chunks" : prot2pdb_chunks,
        "pdb_dir" : pdb_dir,
        "coverage_track" : coverage_track,
        "cancer_genes" : cancer_genes,
        "uniprot_map" : uniprot_map
      }
    )
    
    # (step-#1): pre-processing "prep" task
    clumps_prep = clumps_prep_task(
        inputs = {
            "maf" : localization["maf"], # downstream task inputs are outputs from the localizaton task
            "genome_2bit" : localization["genome_2bit"],
            "fasta" : localization["fasta"],
            "gpmaps" : localization["gpmaps"]
        }
    )

    # (step-#2): run task
    clumps_run = clumps_run_task(
        inputs = {
            "clumps_preprocess" : clumps_prep["prep_outdir"],
            "prot2pdb_chunks" : chunks_list_18,
            "pdb_dir" : pdb_dir,
            "coverage_track" : localization["coverage_track"],
            "genome_2bit" : localization["genome_2bit"],
            "fasta" : localization["fasta"],
            "gpmaps" : localization["gpmaps"]
        }
    )
    
    # (step-#3): post-processing task
    clumps_post = clumps_postprocess_task(
        inputs = {
            "clumps_preprocess" : clumps_prep["prep_outdir"],
            "clumps_results" : [clumps_run["run_outdir"]], # gather_parameter (takes the outputs of the `run-step` as inputs),
            "cancer_genes" : localization["cancer_genes"],
            "uniprot_map" : localization["uniprot_map"]
        }
    )
    
    # Docker Image
    # docker = "gcr.io/broad-getzlab-workflows/clumps:v51"

### (5) SUBMIT the job (run the workflow script)

In [20]:
with wolf.Workflow(
    workflow = clumps_workflow,
    conf = { "clust_frac" : 1 }, 
    common_task_opts = { "retry" : 5 } # retry every task up to 5 times
) as clumps_analysis:
    ca = clumps_analysis.run(
        maf = "gs://sa-clumps2-ref/dat/pancan15k.v18.maf",
        genome_2bit = "gs://sa-clumps2-ref/dat/hg19.2bit",
        fasta = "gs://sa-clumps2-ref/dat/UP000005640_9606.fasta.gz",
        gpmaps = "gs://sa-clumps2-ref/dat/genomeProteomeMaps.txt",
        prot2pdb_chunks = chunks_list,
        pdb_dir = pdb_dir,
        coverage_track = "gs://sa-clumps2-ref/dat/cov/WEx_cov.fwb",
        RUN_NAME = "clumps analysis"
    )

AttributeError: module 'prefect.engine' has no attribute 'executors'

In [None]:
clumps_analysis.results


In [11]:
# THIS IS FOR TESTING
# running the `post_task`
clumpsPost = clumps_postprocess_task()
clumpsPost_results = clumpsPost.run(
    clumps_preprocess = clumpsPrep_results["prep_outdir"],
    clumps_results = [clumpsRun_results["run_outdir"]],
    cancer_genes = clumpsLocalization_results["cancer_genes"],
    uniprot_map = clumpsLocalization_results["uniprot_map"],
    pdb_dir = clumpsLocalization_results["pdb_dir"]
)

[20220317-18:43:37] [prefect] Available disk storage at /mnt/nfs is small (148 GB remaining)
[20220317-18:43:37] [prefect] Starting Slurm controller ...
[20220317-18:43:37] [prefect] Waiting up to 60 seconds for Slurm controller to start ...
[20220317-18:43:39] [prefect] Cannot recover preexisting task outputs: [Errno 2] No such file or directory: '/mnt/nfs/workspace/clumps_post-processing_task_mutspecsampler__2022-03-17--18-43-38_kuiubea_vtg5ndy_pa2ziruvonx1e/jobs/0'
[20220317-18:43:39] [prefect] Overwriting output and aborting job avoidance.
[20220317-18:43:39] [prefect] Localizing inputs...
[20220317-18:43:45] [prefect] Job staged on SLURM controller in: /mnt/nfs/workspace/clumps_post-processing_task_mutspecsampler__2022-03-17--18-43-38_kuiubea_vtg5ndy_pa2ziruvonx1e
[20220317-18:43:45] [prefect] Preparing pipeline script
[20220317-18:49:17] [prefect] Finished with status COMPLETED


In [None]:
# running the `prep_task` for full set, since set of 100 completed
clumpsPrep = clumps_prep_task()
clumpsPrep_results = clumpsPrep.run(
    maf = clumpsLocalization_results["maf"],
    genome_2bit = clumpsLocalization_results["genome_2bit"],
    fasta = clumpsLocalization_results["fasta"],
    gpmaps = clumpsLocalization_results["gpmaps"]
)
clumpsRun = clumps_run_task()
clumpsRun_results = clumpsRun.run(
    clumps_preprocess = clumpsPrep_results["prep_outdir"],
    prot2pdb_chunks = chunks_list_18,
    pdb_dir = clumpsLocalization_results["pdb_dir"],
    coverage_track = clumpsLocalization_results["coverage_track"],
    coverage_track_index = clumpsLocalization_results["coverage_track_index"],
    genome_2bit = clumpsLocalization_results["genome_2bit"],
    fasta = clumpsLocalization_results["fasta"],
    gpmaps = clumpsLocalization_results["gpmaps"]
)
clumpsPost = clumps_postprocess_task()
clumpsPost_results = clumpsPost.run(
    clumps_preprocess = clumpsPrep_results["prep_outdir"],
    clumps_results = [clumpsRun_results["run_outdir"]],
    cancer_genes = clumpsLocalization_results["cancer_genes"],
    uniprot_map = clumpsLocalization_results["uniprot_map"],
    pdb_dir = clumpsLocalization_results["pdb_dir"]
)

[20220315-16:55:51] [prefect] Available disk storage at /mnt/nfs is small (53 GB remaining)
[20220315-16:55:51] [prefect] Starting Slurm controller ...
[20220315-16:55:51] [prefect] Waiting up to 60 seconds for Slurm controller to start ...
[20220315-16:55:52] [prefect] Cannot recover preexisting task outputs: [Errno 2] No such file or directory: '/mnt/nfs/workspace/clumps_pre-processing_task__2022-03-15--16-55-52_ohy20dy_vtg5ndy_xurchlelmcp4c/jobs/0'
[20220315-16:55:52] [prefect] Overwriting output and aborting job avoidance.
[20220315-16:55:52] [prefect] Localizing inputs...
[20220315-16:55:52] [prefect] Job staged on SLURM controller in: /mnt/nfs/workspace/clumps_pre-processing_task__2022-03-15--16-55-52_ohy20dy_vtg5ndy_xurchlelmcp4c
[20220315-16:55:52] [prefect] Preparing pipeline script
[20220315-17:47:13] [prefect] Finished with status COMPLETED
[20220315-17:47:13] [prefect] Available disk storage at /mnt/nfs is small (52 GB remaining)
[20220315-17:47:13] [prefect] Starting Slurm