Permalink
Browse files

CWL: single file tarballs for complex collections

Avoids using complex secondaryFile representations for directories
of files like aligner indices, RTG and snpEff. Instead create a
tarball of files and pass to jobs as a single file. secondaryFiles
are handled differently across platforms and not supported on WDL, so
provides better portability. Also enables snpEff support for effects
prediction in CWL.
1 parent 8462682 commit 01d57604d155363a64fc15b43a038d64218452fb @chapmanb committed Jul 10, 2017
View
@@ -1,3 +1,9 @@
+## 1.0.5 (in progress)
+
+- GATK4: fix option usage for gVCF creation with HaplotypeCaller
+- CWL/WDL: use single file tarballs for complex collections of files like
+ aligner, RTG and snpEff indices.
+
## 1.0.4 (9 July 2017)
- Initial support for GATK4 variant calling with HaplotypeCaller and MuTect2.
View
@@ -5,6 +5,7 @@
import json
import operator
import os
+import tarfile
import toolz as tz
import yaml
@@ -43,7 +44,6 @@ def _cwl_workflow_template(inputs, top_level=False):
"hints": [],
"requirements": [{"class": "EnvVarRequirement",
"envDef": [{"envName": "MPLCONFIGDIR", "envValue": "."}]},
- {"class": "InlineJavascriptRequirement"}, # for secondary Files
{"class": "ScatterFeatureRequirement"},
{"class": "SubworkflowFeatureRequirement"}],
"inputs": ready_inputs,
@@ -345,9 +345,7 @@ def _indexes_to_secondary_files(gresources, genome_build):
# directory plus indexes -- snpEff
elif "base" in val and os.path.isdir(val["base"]) and len(val["indexes"]) > 0:
indexes = val["indexes"]
- val = {"base": indexes[0]}
- if len(indexes) > 1:
- val["indexes"] = indexes[1:]
+ val = {"base": indexes[0], "indexes": indexes[1:]}
elif isinstance(val, dict) and genome_build in val:
val = _indexes_to_secondary_files(val, genome_build)
out[refname] = val
@@ -395,10 +393,8 @@ def half_finished_trim(orig, prefix):
# Return extensions relative to original
if not exts_to_remove or exts_to_remove.startswith("."):
return "^" * exts_to_remove.count(".") + ext_to_add
- # Return entire file relative to original
- # no way to cleanly reference dirname without using InlineJavascriptRequirement
- elif prefix.endswith("/"):
- return '$(self.location.substr(0, self.location.lastIndexOf("/")))/%s' % ext_to_add
+ else:
+ raise ValueError("No cross platform way to reference complex extension: %s %s" % (sf, of))
def _get_avro_type(val):
"""Infer avro type for the current input.
@@ -429,8 +425,8 @@ def _get_avro_type(val):
elif val is None:
return ["null", "string"]
# encode booleans as string True/False and unencode on other side
- elif isinstance(val, bool):
- return "string"
+ elif isinstance(val, bool) or isinstance(val, basestring) and val.lower() in ["true", "false", "none"]:
+ return ["string", "null", "boolean"]
elif isinstance(val, int):
return "long"
elif isinstance(val, float):
@@ -474,9 +470,13 @@ def _to_cwldata(key, val):
def _to_cwlfile_with_indexes(val):
"""Convert reads with ready to go indexes into the right CWL object.
+
+ Identifies the top level directory and creates a tarball, avoiding
+ trying to handle complex secondary setups which are not cross platform.
"""
- return {"class": "File", "path": val["base"],
- "secondaryFiles": [{"class": "File", "path": x} for x in val["indexes"]]}
+ dirname = os.path.dirname(val["base"])
+ assert all([x.startswith(dirname) for x in val["indexes"]])
+ return {"class": "File", "path": _directory_tarball(dirname)}
def _item_to_cwldata(x):
""""Markup an item with CWL specific metadata.
@@ -503,31 +503,31 @@ def _item_to_cwldata(x):
if secondary:
out["secondaryFiles"] = [{"class": "File", "path": y} for y in secondary]
else:
- # aligner and database indices where we list the entire directory as secondary files
- dir_targets = ("mainIndex", ".alt", ".amb", ".ann", ".bwt", ".pac", ".sa", ".ebwt", ".bt2",
- "Genome", "GenomeIndex", "GenomeIndexHash", "OverflowTable")
- assert os.path.isdir(x)
- base_name = None
- fnames = sorted(os.listdir(x))
- for fname in fnames:
- if fname.endswith(dir_targets):
- base_name = fname
- break
- if base_name:
- fnames.pop(fnames.index(base_name))
- base_name = os.path.join(x, base_name)
- fnames = [os.path.join(x, y) for y in fnames]
- out = {"class": "File", "path": base_name,
- "secondaryFiles": [{"class": "File", "path": f} for f in fnames]}
- # skip directories we're not currently using in CWL recipes
- else:
- out = None
+ out = {"class": "File", "path": _directory_tarball(x)}
return out
elif isinstance(x, bool):
return str(x)
else:
return x
+def _directory_tarball(dirname):
+ """Create a tarball of a complex directory, avoiding complex secondaryFiles.
+
+ Complex secondary files do not work on multiple platforms and are not portable
+ to WDL, so for now we create a tarball that workers will unpack.
+ """
+ assert os.path.isdir(dirname)
+ base_dir, tarball_dir = os.path.split(dirname)
+ while base_dir and not os.path.exists(os.path.join(base_dir, "seq")):
+ base_dir, extra_tarball = os.path.split(base_dir)
+ tarball_dir = os.path.join(extra_tarball, tarball_dir)
+ tarball = os.path.join(base_dir, "%s-wf.tar.gz" % (tarball_dir.replace(os.path.sep, "--")))
+ if not utils.file_exists(tarball):
+ with utils.chdir(base_dir):
+ with tarfile.open(tarball, "w:gz") as tar:
+ tar.add(tarball_dir)
+ return tarball
+
def _clean_final_outputs(keyvals, integrations=None):
def clean_path(integrations, x):
retriever = _get_retriever(x, integrations)
View
@@ -5,11 +5,14 @@
non-variant calling workflows.
"""
import collections
+import os
import pprint
+import tarfile
import toolz as tz
from bcbio import utils
+from bcbio.pipeline import datadict as dd
def to_rec(samples, default_keys=None):
"""Convert inputs into CWL records, useful for single item parallelization.
@@ -44,6 +47,40 @@ def normalize_missing(xs):
xs = False
return xs
+# aligner and database indices where we list the entire directory as secondary files
+DIR_TARGETS = ("mainIndex", ".alt", ".amb", ".ann", ".bwt", ".pac", ".sa", ".ebwt", ".bt2",
+ "Genome", "GenomeIndex", "GenomeIndexHash", "OverflowTable")
+
+def unpack_tarballs(xs, data, use_subdir=True):
+ """Unpack workflow tarballs into ready to use directories.
+ """
+ if isinstance(xs, dict):
+ for k, v in xs.items():
+ xs[k] = unpack_tarballs(v, data, use_subdir)
+ elif isinstance(xs, (list, tuple)):
+ xs = [unpack_tarballs(x, data, use_subdir) for x in xs]
+ elif isinstance(xs, basestring):
+ if os.path.isfile(xs) and xs.endswith("-wf.tar.gz"):
+ if use_subdir:
+ tarball_dir = utils.safe_makedir(os.path.join(dd.get_work_dir(data), "wf-inputs"))
+ else:
+ tarball_dir = dd.get_work_dir(data)
+ out_dir = os.path.join(tarball_dir,
+ os.path.basename(xs).replace("-wf.tar.gz", "").replace("--", os.path.sep))
+ if not os.path.exists(out_dir):
+ with utils.chdir(tarball_dir):
+ with tarfile.open(xs, "r:gz") as tar:
+ tar.extractall()
+ assert os.path.exists(out_dir), out_dir
+ # Default to representing output directory
+ xs = out_dir
+ # Look for aligner indices
+ for fname in os.listdir(out_dir):
+ if fname.endswith(DIR_TARGETS):
+ xs = os.path.join(out_dir, fname)
+ break
+ return xs
+
def _get_all_cwlkeys(items, default_keys=None):
"""Retrieve cwlkeys from inputs, handling defaults which can be null.
View
@@ -164,7 +164,9 @@ def _variant_vc(checkpoints):
["config", "algorithm", "tools_on"],
["config", "algorithm", "tools_off"],
["reference", "fasta", "base"], ["reference", "rtg"], ["reference", "genome_context"],
- ["genome_resources", "variation", "cosmic"], ["genome_resources", "variation", "dbsnp"]],
+ ["genome_resources", "variation", "cosmic"], ["genome_resources", "variation", "dbsnp"],
+ ["genome_resources", "aliases", "ensembl"], ["genome_resources", "aliases", "human"],
+ ["genome_resources", "aliases", "snpeff"], ["reference", "snpeff", "genome_build"]],
[cwlout("batch_rec", "record")],
"bcbio-base",
cores=1,
View
@@ -269,6 +269,14 @@ def _handle_special_inputs(inputs, variables):
out.append(vid)
found_indexes = True
assert found_indexes, "Found no aligner indexes in %s" % [v["id"] for v in variables]
+ elif input == ["reference", "snpeff", "genome_build"]:
+ found_indexes = False
+ for v in variables:
+ vid = get_base_id(v["id"]).split("__")
+ if vid[0] == "reference" and vid[1] == "snpeff":
+ out.append(vid)
+ found_indexes = True
+ assert found_indexes, "Found no snpEff indexes in %s" % [v["id"] for v in variables]
elif input in optional:
if _get_string_vid(input) in all_vs:
out.append(input)
@@ -16,6 +16,7 @@
from bcbio import log, utils
from bcbio.log import logger
+from bcbio.cwl import cwlutils
from bcbio.distributed import multitasks
from bcbio.pipeline import config_utils, run_info
@@ -364,6 +365,7 @@ def _finalize_cwl_in(data, work_dir, passed_keys, output_cwl_keys, runtime):
data["output_cwl_keys"] = output_cwl_keys
data = _add_resources(data, runtime)
data = run_info.normalize_world(data)
+ data = cwlutils.normalize_missing(data)
return data
def _convert_value(val):
@@ -477,9 +479,6 @@ def _collapse_to_cwl_record(samples, want_attrs):
def _to_cwl(val):
"""Convert a value into CWL formatted JSON, handling files and complex things.
"""
- # aligner and database indices where we list the entire directory as secondary files
- dir_targets = ("mainIndex", ".alt", ".amb", ".ann", ".bwt", ".pac", ".sa", ".ebwt", ".bt2",
- "Genome", "GenomeIndex", "GenomeIndexHash", "OverflowTable")
if isinstance(val, basestring):
if os.path.exists(val) and os.path.isfile(val):
val = {"class": "File", "path": val}
@@ -496,7 +495,7 @@ def _to_cwl(val):
# Handle relative paths
if not cur_dir:
cur_dir = os.getcwd()
- if cur_file.endswith(dir_targets):
+ if cur_file.endswith(cwlutils.DIR_TARGETS):
for fname in os.listdir(cur_dir):
if fname != cur_file:
secondary.append({"class": "File", "path": os.path.join(cur_dir, fname)})
View
@@ -10,6 +10,7 @@
import yaml
from bcbio import utils
+from bcbio.cwl import cwlutils
from bcbio.distributed import objectstore
from bcbio.log import logger
from bcbio.ngsalign import star
@@ -172,6 +173,12 @@ def _get_ref_from_galaxy_loc(name, genome_build, loc_file, galaxy_dt, need_remap
# allow multiple references in a file and use the most recently added
else:
cur_ref = refs[-1]
+ # Find genome directory and check for packed wf tarballs
+ cur_ref_norm = os.path.normpath(utils.add_full_path(cur_ref, galaxy_config["tool_data_path"]))
+ base_dir_i = cur_ref_norm.find("/%s/" % genome_build)
+ base_dir = os.path.join(cur_ref_norm[:base_dir_i], genome_build)
+ for tarball in glob.glob(os.path.join(base_dir, "*-wf.tar.gz")):
+ cwlutils.unpack_tarballs(tarball, {"dirs": {"work": base_dir}}, use_subdir=False)
if need_remap:
assert remap_fn is not None, "%s requires remapping function from base location file" % name
cur_ref = os.path.normpath(utils.add_full_path(cur_ref, galaxy_config["tool_data_path"]))
View
@@ -106,6 +106,7 @@ def process_alignment(data, alt_input=None):
"""Do an alignment of fastq files, preparing a sorted BAM output file.
"""
data = cwlutils.normalize_missing(utils.to_single_data(data))
+ data = cwlutils.unpack_tarballs(data, data)
fastq1, fastq2 = dd.get_input_sequence_files(data)
if alt_input:
fastq1, fastq2 = alt_input
@@ -4,6 +4,7 @@
import toolz as tz
from bcbio import utils
+from bcbio.cwl import cwlutils
from bcbio.log import logger
from bcbio.pipeline import datadict as dd
from bcbio.variation.genotype import variant_filtration, get_variantcaller
@@ -16,12 +17,14 @@ def postprocess_variants(items):
"""Provide post-processing of variant calls: filtering and effects annotation.
"""
data, items = _get_batch_representative(items, "vrn_file")
+ items = cwlutils.unpack_tarballs(items, data)
+ data = cwlutils.unpack_tarballs(data, data)
cur_name = "%s, %s" % (dd.get_sample_name(data), get_variantcaller(data))
logger.info("Finalizing variant calls: %s" % cur_name)
orig_vrn_file = data.get("vrn_file")
data = _symlink_to_workdir(data, ["vrn_file"])
data = _symlink_to_workdir(data, ["config", "algorithm", "variant_regions"])
- if data.get("align_bam") and data.get("vrn_file"):
+ if data.get("vrn_file"):
logger.info("Calculating variation effects for %s" % cur_name)
ann_vrn_file, vrn_stats = effects.add_to_vcf(data["vrn_file"], data)
if ann_vrn_file:
@@ -40,8 +43,9 @@ def postprocess_variants(items):
logger.info("Germline extraction for %s" % cur_name)
data = germline.extract(data, orig_items)
- data = damage.run_filter(data["vrn_file"], dd.get_align_bam(data), dd.get_ref_file(data),
- data, orig_items)
+ if dd.get_align_bam(data):
+ data = damage.run_filter(data["vrn_file"], dd.get_align_bam(data), dd.get_ref_file(data),
+ data, orig_items)
if orig_vrn_file and os.path.samefile(data["vrn_file"], orig_vrn_file):
data["vrn_file"] = orig_vrn_file
return [[data]]
@@ -275,10 +275,15 @@ def get_db(data):
snpeff_db = utils.get_in(data, ("genome_resources", "aliases", "snpeff"))
snpeff_base_dir = None
if snpeff_db:
- snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db, "base"))
+ snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db))
if not snpeff_base_dir:
# We need to mask '.' characters for CWL/WDL processing, check for them here
- snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db.replace("_", "."), "base"))
+ snpeff_base_dir = utils.get_in(data, ("reference", "snpeff", snpeff_db.replace("_", ".")))
+ if isinstance(snpeff_base_dir, dict) and snpeff_base_dir.get("base"):
+ snpeff_base_dir = snpeff_base_dir["base"]
+ if (snpeff_base_dir and isinstance(snpeff_base_dir, basestring)
+ and snpeff_base_dir.endswith("%s%s" % (os.path.sep, snpeff_db))):
+ snpeff_base_dir = os.path.dirname(snpeff_base_dir)
if not snpeff_base_dir:
ref_file = utils.get_in(data, ("reference", "fasta", "base"))
snpeff_base_dir = utils.safe_makedir(os.path.normpath(os.path.join(
@@ -33,7 +33,7 @@ def _get_validate(data):
"""Retrieve items to validate, from single samples or from combined joint calls.
"""
if data.get("vrn_file") and tz.get_in(["config", "algorithm", "validate"], data):
- return data
+ return utils.deepish_copy(data)
elif "group_orig" in data:
for sub in multi.get_orig_items(data):
if "validate" in sub["config"]["algorithm"]:
@@ -91,13 +91,15 @@ def _normalize_cwl_inputs(items):
"""
with_validate = {}
vrn_files = []
+ ready_items = []
for data in (cwlutils.normalize_missing(utils.to_single_data(d)) for d in items):
if tz.get_in(["config", "algorithm", "validate"], data):
with_validate[_checksum(tz.get_in(["config", "algorithm", "validate"], data))] = data
if data.get("vrn_file"):
vrn_files.append(data["vrn_file"])
+ ready_items.append(data)
if len(with_validate) == 0:
- return items[0]
+ return ready_items[0]
else:
assert len(with_validate) == 1, len(with_validate)
assert len(set(vrn_files)) == 1
@@ -120,6 +122,7 @@ def compare_to_rm(data):
if isinstance(data, (list, tuple)):
data = _normalize_cwl_inputs(data)
toval_data = _get_validate(data)
+ toval_data = cwlutils.unpack_tarballs(toval_data, toval_data)
if toval_data:
caller = _get_caller(toval_data)
sample = dd.get_sample_name(toval_data)
View
@@ -4,7 +4,7 @@
import os
from setuptools import setup, find_packages
-version = "1.0.4"
+version = "1.0.5a0"
def write_version_py():
version_py = os.path.join(os.path.dirname(__file__), 'bcbio', 'pipeline',

0 comments on commit 01d5760

Please sign in to comment.