Spark Improvements in 4.1: Delivering results faster!
By Tom White & James Emery
We continue to improve our support for users who want to run on Apache Spark with GATK 4.1. Spark is a popular engine for large-scale data processing that can be deployed in clusters or on a standalone machine with multiple cores - a good way to speed up GATK tools without having to invest in a cluster.
The newest release introduces MarkDuplicatesSpark, a newly out-of-beta replacement redesigned to take advantage of multicore parallelism and run on cloud platforms. When run with large multicore machines on Google Compute Engine, the new and improved version is cost-competitive with Picard MarkDuplicates + SortSam while being significantly faster overall. Other spark improvements include the full ReadsPipelineSpark, powered by a brand new Spark I/O library, Disq.
MarkDuplicatesSpark is a replacement for the old MarkDuplicates + SortSam steps of the best practices pipeline, neither of which take advantage of multicore parallelism. For 4.1 we redesigned the MarkDuplicatesSpark to 1) achieve good parallelism with up to 16 cores on single machine instances and 2) limit the amount of spark shuffles performed and reduce the size of the data that must be parallelized. One challenge with the previous pipeline version was that we often couldn’t afford to run Picard or SortSam on preemptible instances because at five hours the chances of getting preempted was too high. Since MarkDuplicatesSpark finishes in only 3-4 hours on a typical bam, we are able to justify running it preemptively, a significant cost reduction compared to Picard. The Spark version of the MarkDuplicates tool from Picard (MarkDuplicatesSpark) is now certified to run on Spark for production workloads, and scales very well even when run in local mode using the available cores of a single machine.
Here is a graph of cost versus runtime for MarkDuplicatesSpark and the equivalent Picard tools.
Note that on some configurations for 4 and 8 core machines MarkDuplicatesSparks costs are comparable while runtimes are a third compared to the combined MarkDuplicates + SortSam dot on the right of the graph for non-Preemptible machines.
For larger pipelines that require larger computational resources, running on a cluster is best. ReadsPipelineSpark, a pure-Spark workflow combining MarkDuplicates, BQSR, and HaplotypeCaller, can take advantage of a Spark cluster for greatly improved runtimes. Using a 20-node Google Dataproc cluster, we were able to speed up this pipeline on a 30x WGS input BAM from 94 minutes to 55 minutes. We achieved these in large part by using the Spark “Auxiliary files” feature to load side inputs like VCF and Reference files. Previously these files were broadcast to Spark workers and stored in memory. Now we take advantage of Spark’s built-in mechanism for efficiently distributing files across the cluster, and then reading them from the worker’s local disk. GATK code was already optimized for the typical access patterns for reading features from local files, so the new approach gives a significant speed boost. Scripts and tuning suggestions for running the pipeline on Google Dataproc are available in the GATK GitHub repository.
We have devoted a lot of effort to ensure the output of the newest versions of two of our core tools, MarkDuplicatesSpark and HaplotypeCallerSpark, match their non-Spark equivalents, despite boundary effects that come into play because Spark processes in parallel. The new spark tool, RevertSamSpark, replaces the Picard tool RevertSam, and MarkDuplicatesSpark now produces identical output to the Picard version. In releases prior to 4.1, HaplotypeCallerSpark output was not always close to the non-Spark version, so in 4.1 we have added a “strict” mode that matches the regular HaplotypeCaller exactly at the expense of runtime. With “strict” mode disabled HaplotypeCallerSpark will still produce almost identical output due to sharding boundary effects.
Spark-native replacement to Hadoop-BAM MapReduce library
Prior to 4.1, GATK relied on the Hadoop-BAM MapReduce library for reading and writing BAM files (and some other common formats like CRAM, SAM, and VCF). Over the last year, we have been working on a native-Spark replacement, Disq, that is more accurate with better performance - particularly when using cloud stores, like Google Cloud Storage. Disq is default in GATK 4.1, making all GATK Spark tools faster across-the-board.
The new Disq codebase also enables features that optimize parallel processing. On-the-fly indexing of BAM files produces BAI files in parallel when writing BAM files from Spark tools. And support for the new Splitting BAM Index (SBI) format facilitates reading BAM files in parallel and is proposed as a new part of the SAM specification.
We constantly strive to create and improve methods and make it accessible to the scientific community to drive biomedical research. So stay tuned for more, new and improved GATK tools!