From 01d57604d155363a64fc15b43a038d64218452fb Mon Sep 17 00:00:00 2001 From: chapmanb Date: Mon, 10 Jul 2017 04:12:11 -0400 Subject: [PATCH] CWL: single file tarballs for complex collections Avoids using complex secondaryFile representations for directories of files like aligner indices, RTG and snpEff. Instead create a tarball of files and pass to jobs as a single file. secondaryFiles are handled differently across platforms and not supported on WDL, so provides better portability. Also enables snpEff support for effects prediction in CWL. --- HISTORY.md | 6 +++++ bcbio/cwl/create.py | 62 ++++++++++++++++++++++----------------------- bcbio/cwl/cwlutils.py | 37 +++++++++++++++++++++++++++ bcbio/cwl/defs.py | 4 ++- bcbio/cwl/workflow.py | 8 ++++++ bcbio/distributed/runfn.py | 7 +++-- bcbio/pipeline/genome.py | 7 +++++ bcbio/pipeline/sample.py | 1 + bcbio/pipeline/variation.py | 10 +++++--- bcbio/variation/effects.py | 9 +++++-- bcbio/variation/validate.py | 7 +++-- setup.py | 2 +- 12 files changed, 116 insertions(+), 44 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 9a949d8f0..04240203b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,9 @@ +## 1.0.5 (in progress) + +- GATK4: fix option usage for gVCF creation with HaplotypeCaller +- CWL/WDL: use single file tarballs for complex collections of files like + aligner, RTG and snpEff indices. + ## 1.0.4 (9 July 2017) - Initial support for GATK4 variant calling with HaplotypeCaller and MuTect2. diff --git a/bcbio/cwl/create.py b/bcbio/cwl/create.py index 63ae7746e..064d8d8a2 100644 --- a/bcbio/cwl/create.py +++ b/bcbio/cwl/create.py @@ -5,6 +5,7 @@ import json import operator import os +import tarfile import toolz as tz import yaml @@ -43,7 +44,6 @@ def _cwl_workflow_template(inputs, top_level=False): "hints": [], "requirements": [{"class": "EnvVarRequirement", "envDef": [{"envName": "MPLCONFIGDIR", "envValue": "."}]}, - {"class": "InlineJavascriptRequirement"}, # for secondary Files {"class": "ScatterFeatureRequirement"}, {"class": "SubworkflowFeatureRequirement"}], "inputs": ready_inputs, @@ -345,9 +345,7 @@ def _indexes_to_secondary_files(gresources, genome_build): # directory plus indexes -- snpEff elif "base" in val and os.path.isdir(val["base"]) and len(val["indexes"]) > 0: indexes = val["indexes"] - val = {"base": indexes[0]} - if len(indexes) > 1: - val["indexes"] = indexes[1:] + val = {"base": indexes[0], "indexes": indexes[1:]} elif isinstance(val, dict) and genome_build in val: val = _indexes_to_secondary_files(val, genome_build) out[refname] = val @@ -395,10 +393,8 @@ def half_finished_trim(orig, prefix): # Return extensions relative to original if not exts_to_remove or exts_to_remove.startswith("."): return "^" * exts_to_remove.count(".") + ext_to_add - # Return entire file relative to original - # no way to cleanly reference dirname without using InlineJavascriptRequirement - elif prefix.endswith("/"): - return '$(self.location.substr(0, self.location.lastIndexOf("/")))/%s' % ext_to_add + else: + raise ValueError("No cross platform way to reference complex extension: %s %s" % (sf, of)) def _get_avro_type(val): """Infer avro type for the current input. @@ -429,8 +425,8 @@ def _get_avro_type(val): elif val is None: return ["null", "string"] # encode booleans as string True/False and unencode on other side - elif isinstance(val, bool): - return "string" + elif isinstance(val, bool) or isinstance(val, basestring) and val.lower() in ["true", "false", "none"]: + return ["string", "null", "boolean"] elif isinstance(val, int): return "long" elif isinstance(val, float): @@ -474,9 +470,13 @@ def _to_cwldata(key, val): def _to_cwlfile_with_indexes(val): """Convert reads with ready to go indexes into the right CWL object. + + Identifies the top level directory and creates a tarball, avoiding + trying to handle complex secondary setups which are not cross platform. """ - return {"class": "File", "path": val["base"], - "secondaryFiles": [{"class": "File", "path": x} for x in val["indexes"]]} + dirname = os.path.dirname(val["base"]) + assert all([x.startswith(dirname) for x in val["indexes"]]) + return {"class": "File", "path": _directory_tarball(dirname)} def _item_to_cwldata(x): """"Markup an item with CWL specific metadata. @@ -503,31 +503,31 @@ def _item_to_cwldata(x): if secondary: out["secondaryFiles"] = [{"class": "File", "path": y} for y in secondary] else: - # aligner and database indices where we list the entire directory as secondary files - dir_targets = ("mainIndex", ".alt", ".amb", ".ann", ".bwt", ".pac", ".sa", ".ebwt", ".bt2", - "Genome", "GenomeIndex", "GenomeIndexHash", "OverflowTable") - assert os.path.isdir(x) - base_name = None - fnames = sorted(os.listdir(x)) - for fname in fnames: - if fname.endswith(dir_targets): - base_name = fname - break - if base_name: - fnames.pop(fnames.index(base_name)) - base_name = os.path.join(x, base_name) - fnames = [os.path.join(x, y) for y in fnames] - out = {"class": "File", "path": base_name, - "secondaryFiles": [{"class": "File", "path": f} for f in fnames]} - # skip directories we're not currently using in CWL recipes - else: - out = None + out = {"class": "File", "path": _directory_tarball(x)} return out elif isinstance(x, bool): return str(x) else: return x +def _directory_tarball(dirname): + """Create a tarball of a complex directory, avoiding complex secondaryFiles. + + Complex secondary files do not work on multiple platforms and are not portable + to WDL, so for now we create a tarball that workers will unpack. + """ + assert os.path.isdir(dirname) + base_dir, tarball_dir = os.path.split(dirname) + while base_dir and not os.path.exists(os.path.join(base_dir, "seq")): + base_dir, extra_tarball = os.path.split(base_dir) + tarball_dir = os.path.join(extra_tarball, tarball_dir) + tarball = os.path.join(base_dir, "%s-wf.tar.gz" % (tarball_dir.replace(os.path.sep, "--"))) + if not utils.file_exists(tarball): + with utils.chdir(base_dir): + with tarfile.open(tarball, "w:gz") as tar: + tar.add(tarball_dir) + return tarball + def _clean_final_outputs(keyvals, integrations=None): def clean_path(integrations, x): retriever = _get_retriever(x, integrations) diff --git a/bcbio/cwl/cwlutils.py b/bcbio/cwl/cwlutils.py index dc493cd63..cd4d354c7 100644 --- a/bcbio/cwl/cwlutils.py +++ b/bcbio/cwl/cwlutils.py @@ -5,11 +5,14 @@ non-variant calling workflows. """ import collections +import os import pprint +import tarfile import toolz as tz from bcbio import utils +from bcbio.pipeline import datadict as dd def to_rec(samples, default_keys=None): """Convert inputs into CWL records, useful for single item parallelization. @@ -44,6 +47,40 @@ def normalize_missing(xs): xs = False return xs +# aligner and database indices where we list the entire directory as secondary files +DIR_TARGETS = ("mainIndex", ".alt", ".amb", ".ann", ".bwt", ".pac", ".sa", ".ebwt", ".bt2", + "Genome", "GenomeIndex", "GenomeIndexHash", "OverflowTable") + +def unpack_tarballs(xs, data, use_subdir=True): + """Unpack workflow tarballs into ready to use directories. + """ + if isinstance(xs, dict): + for k, v in xs.items(): + xs[k] = unpack_tarballs(v, data, use_subdir) + elif isinstance(xs, (list, tuple)): + xs = [unpack_tarballs(x, data, use_subdir) for x in xs] + elif isinstance(xs, basestring): + if os.path.isfile(xs) and xs.endswith("-wf.tar.gz"): + if use_subdir: + tarball_dir = utils.safe_makedir(os.path.join(dd.get_work_dir(data), "wf-inputs")) + else: + tarball_dir = dd.get_work_dir(data) + out_dir = os.path.join(tarball_dir, + os.path.basename(xs).replace("-wf.tar.gz", "").replace("--", os.path.sep)) + if not os.path.exists(out_dir): + with utils.chdir(tarball_dir): + with tarfile.open(xs, "r:gz") as tar: + tar.extractall() + assert os.path.exists(out_dir), out_dir + # Default to representing output directory + xs = out_dir + # Look for aligner indices + for fname in os.listdir(out_dir): + if fname.endswith(DIR_TARGETS): + xs = os.path.join(out_dir, fname) + break + return xs + def _get_all_cwlkeys(items, default_keys=None): """Retrieve cwlkeys from inputs, handling defaults which can be null. diff --git a/bcbio/cwl/defs.py b/bcbio/cwl/defs.py index 32b844dc9..f5390a634 100644 --- a/bcbio/cwl/defs.py +++ b/bcbio/cwl/defs.py @@ -164,7 +164,9 @@ def _variant_vc(checkpoints): ["config", "algorithm", "tools_on"], ["config", "algorithm", "tools_off"], ["reference", "fasta", "base"], ["reference", "rtg"], ["reference", "genome_context"], - ["genome_resources", "variation", "cosmic"], ["genome_resources", "variation", "dbsnp"]], + ["genome_resources", "variation", "cosmic"], ["genome_resources", "variation", "dbsnp"], + ["genome_resources", "aliases", "ensembl"], ["genome_resources", "aliases", "human"], + ["genome_resources", "aliases", "snpeff"], ["reference", "snpeff", "genome_build"]], [cwlout("batch_rec", "record")], "bcbio-base", cores=1, diff --git a/bcbio/cwl/workflow.py b/bcbio/cwl/workflow.py index 6293fe41e..b7ff5e55d 100644 --- a/bcbio/cwl/workflow.py +++ b/bcbio/cwl/workflow.py @@ -269,6 +269,14 @@ def _handle_special_inputs(inputs, variables): out.append(vid) found_indexes = True assert found_indexes, "Found no aligner indexes in %s" % [v["id"] for v in variables] + elif input == ["reference", "snpeff", "genome_build"]: + found_indexes = False + for v in variables: + vid = get_base_id(v["id"]).split("__") + if vid[0] == "reference" and vid[1] == "snpeff": + out.append(vid) + found_indexes = True + assert found_indexes, "Found no snpEff indexes in %s" % [v["id"] for v in variables] elif input in optional: if _get_string_vid(input) in all_vs: out.append(input) diff --git a/bcbio/distributed/runfn.py b/bcbio/distributed/runfn.py index d865d72d0..e109eb9e2 100644 --- a/bcbio/distributed/runfn.py +++ b/bcbio/distributed/runfn.py @@ -16,6 +16,7 @@ from bcbio import log, utils from bcbio.log import logger +from bcbio.cwl import cwlutils from bcbio.distributed import multitasks from bcbio.pipeline import config_utils, run_info @@ -364,6 +365,7 @@ def _finalize_cwl_in(data, work_dir, passed_keys, output_cwl_keys, runtime): data["output_cwl_keys"] = output_cwl_keys data = _add_resources(data, runtime) data = run_info.normalize_world(data) + data = cwlutils.normalize_missing(data) return data def _convert_value(val): @@ -477,9 +479,6 @@ def _collapse_to_cwl_record(samples, want_attrs): def _to_cwl(val): """Convert a value into CWL formatted JSON, handling files and complex things. """ - # aligner and database indices where we list the entire directory as secondary files - dir_targets = ("mainIndex", ".alt", ".amb", ".ann", ".bwt", ".pac", ".sa", ".ebwt", ".bt2", - "Genome", "GenomeIndex", "GenomeIndexHash", "OverflowTable") if isinstance(val, basestring): if os.path.exists(val) and os.path.isfile(val): val = {"class": "File", "path": val} @@ -496,7 +495,7 @@ def _to_cwl(val): # Handle relative paths if not cur_dir: cur_dir = os.getcwd() - if cur_file.endswith(dir_targets): + if cur_file.endswith(cwlutils.DIR_TARGETS): for fname in os.listdir(cur_dir): if fname != cur_file: secondary.append({"class": "File", "path": os.path.join(cur_dir, fname)}) diff --git a/bcbio/pipeline/genome.py b/bcbio/pipeline/genome.py index c44f5d12c..52dfeeb39 100644 --- a/bcbio/pipeline/genome.py +++ b/bcbio/pipeline/genome.py @@ -10,6 +10,7 @@ import yaml from bcbio import utils +from bcbio.cwl import cwlutils from bcbio.distributed import objectstore from bcbio.log import logger from bcbio.ngsalign import star @@ -172,6 +173,12 @@ def _get_ref_from_galaxy_loc(name, genome_build, loc_file, galaxy_dt, need_remap # allow multiple references in a file and use the most recently added else: cur_ref = refs[-1] + # Find genome directory and check for packed wf tarballs + cur_ref_norm = os.path.normpath(utils.add_full_path(cur_ref, galaxy_config["tool_data_path"])) + base_dir_i = cur_ref_norm.find("/%s/" % genome_build) + base_dir = os.path.join(cur_ref_norm[:base_dir_i], genome_build) + for tarball in glob.glob(os.path.join(base_dir, "*-wf.tar.gz")): + cwlutils.unpack_tarballs(tarball, {"dirs": {"work": base_dir}}, use_subdir=False) if need_remap: assert remap_fn is not None, "%s requires remapping function from base location file" % name cur_ref = os.path.normpath(utils.add_full_path(cur_ref, galaxy_config["tool_data_path"])) diff --git a/bcbio/pipeline/sample.py b/bcbio/pipeline/sample.py index 482f5beae..eceeff20e 100644 --- a/bcbio/pipeline/sample.py +++ b/bcbio/pipeline/sample.py @@ -106,6 +106,7 @@ def process_alignment(data, alt_input=None): """Do an alignment of fastq files, preparing a sorted BAM output file. """ data = cwlutils.normalize_missing(utils.to_single_data(data)) + data = cwlutils.unpack_tarballs(data, data) fastq1, fastq2 = dd.get_input_sequence_files(data) if alt_input: fastq1, fastq2 = alt_input diff --git a/bcbio/pipeline/variation.py b/bcbio/pipeline/variation.py index a1556b622..0ca3411b5 100644 --- a/bcbio/pipeline/variation.py +++ b/bcbio/pipeline/variation.py @@ -4,6 +4,7 @@ import toolz as tz from bcbio import utils +from bcbio.cwl import cwlutils from bcbio.log import logger from bcbio.pipeline import datadict as dd from bcbio.variation.genotype import variant_filtration, get_variantcaller @@ -16,12 +17,14 @@ def postprocess_variants(items): """Provide post-processing of variant calls: filtering and effects annotation. """ data, items = _get_batch_representative(items, "vrn_file") + items = cwlutils.unpack_tarballs(items, data) + data = cwlutils.unpack_tarballs(data, data) cur_name = "%s, %s" % (dd.get_sample_name(data), get_variantcaller(data)) logger.info("Finalizing variant calls: %s" % cur_name) orig_vrn_file = data.get("vrn_file") data = _symlink_to_workdir(data, ["vrn_file"]) data = _symlink_to_workdir(data, ["config", "algorithm", "variant_regions"]) - if data.get("align_bam") and data.get("vrn_file"): + if data.get("vrn_file"): logger.info("Calculating variation effects for %s" % cur_name) ann_vrn_file, vrn_stats = effects.add_to_vcf(data["vrn_file"], data) if ann_vrn_file: @@ -40,8 +43,9 @@ def postprocess_variants(items): logger.info("Germline extraction for %s" % cur_name) data = germline.extract(data, orig_items) - data = damage.run_filter(data["vrn_file"], dd.get_align_bam(data), dd.get_ref_file(data), - data, orig_items) + if dd.get_align_bam(data): + data = damage.run_filter(data["vrn_file"], dd.get_align_bam(data), dd.get_ref_file(data), + data, orig_items) if orig_vrn_file and os.path.samefile(data["vrn_file"], orig_vrn_file): data["vrn_file"] = orig_vrn_file return [[data]] diff --git a/bcbio/variation/effects.py b/bcbio/variation/effects.py index ac41e786d..7dd0008ef 100644 --- a/bcbio/variation/effects.py +++ b/bcbio/variation/effects.py @@ -275,10 +275,15 @@ def get_db(data): snpeff_db = utils.get_in(data, ("genome_resources", "aliases", "snpeff")) snpeff_base_dir = None if snpeff_db: - snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db, "base")) + snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db)) if not snpeff_base_dir: # We need to mask '.' characters for CWL/WDL processing, check for them here - snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db.replace("_", "."), "base")) + snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db.replace("_", "."))) + if isinstance(snpeff_base_dir, dict) and snpeff_base_dir.get("base"): + snpeff_base_dir = snpeff_base_dir["base"] + if (snpeff_base_dir and isinstance(snpeff_base_dir, basestring) + and snpeff_base_dir.endswith("%s%s" % (os.path.sep, snpeff_db))): + snpeff_base_dir = os.path.dirname(snpeff_base_dir) if not snpeff_base_dir: ref_file = utils.get_in(data, ("reference", "fasta", "base")) snpeff_base_dir = utils.safe_makedir(os.path.normpath(os.path.join( diff --git a/bcbio/variation/validate.py b/bcbio/variation/validate.py index dc0c8b9ba..79cfdbd70 100644 --- a/bcbio/variation/validate.py +++ b/bcbio/variation/validate.py @@ -33,7 +33,7 @@ def _get_validate(data): """Retrieve items to validate, from single samples or from combined joint calls. """ if data.get("vrn_file") and tz.get_in(["config", "algorithm", "validate"], data): - return data + return utils.deepish_copy(data) elif "group_orig" in data: for sub in multi.get_orig_items(data): if "validate" in sub["config"]["algorithm"]: @@ -91,13 +91,15 @@ def _normalize_cwl_inputs(items): """ with_validate = {} vrn_files = [] + ready_items = [] for data in (cwlutils.normalize_missing(utils.to_single_data(d)) for d in items): if tz.get_in(["config", "algorithm", "validate"], data): with_validate[_checksum(tz.get_in(["config", "algorithm", "validate"], data))] = data if data.get("vrn_file"): vrn_files.append(data["vrn_file"]) + ready_items.append(data) if len(with_validate) == 0: - return items[0] + return ready_items[0] else: assert len(with_validate) == 1, len(with_validate) assert len(set(vrn_files)) == 1 @@ -120,6 +122,7 @@ def compare_to_rm(data): if isinstance(data, (list, tuple)): data = _normalize_cwl_inputs(data) toval_data = _get_validate(data) + toval_data = cwlutils.unpack_tarballs(toval_data, toval_data) if toval_data: caller = _get_caller(toval_data) sample = dd.get_sample_name(toval_data) diff --git a/setup.py b/setup.py index 767a9347b..3d5b7c79c 100755 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import os from setuptools import setup, find_packages -version = "1.0.4" +version = "1.0.5a0" def write_version_py(): version_py = os.path.join(os.path.dirname(__file__), 'bcbio', 'pipeline',