Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SparkFiles to localize reference and known sites #5127

Merged
merged 6 commits into from Oct 2, 2018

Conversation

tomwhite
Copy link
Contributor

... for BaseRecalibratorSpark, BQSRPipelineSpark, ReadsPipelineSpark, and to localise reference for HaplotypeCallerSpark.

As a consequence of this change, the reference can be a standard fasta file for Spark
tools (indeed 2bit is no longer supported, since htsjdk does not have support for it).

Also:

  • Add AutoCloseableCollection to automatically close all objects in a collection.
  • Add CloseAtEndIterator to call close on an object at the end of the iteration.
  • Don't materialize all reads in memory in ApplyBQSRSparkFn.

See #5103

@tomwhite
Copy link
Contributor Author

@lbergelson please take a look at this when you can. It reduces exome reads pipeline on Spark from 24min to 12min, and genome from 94min to 55min on the same clusters.

@droazen droazen requested review from jamesemery and removed request for lbergelson August 24, 2018 16:45
@droazen
Copy link
Collaborator

droazen commented Aug 24, 2018

@jamesemery please review in @lbergelson 's absence

Copy link
Collaborator

@jamesemery jamesemery left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably keep the BROADCAST option on BQSR at least for profiling purposes (so we have something to make a comparison against). I know that will involve some work but it is probably better to spin it out into a second branch until we are convinced. The other comment I have is to ask whether you have tested the new behavior on single many-core machine types at all? It seems there might be issues that arise from 32 and 64 workers all attempting to open the same file for every assembly region in Haplotype Caller and the single machine spark case is one that we definitely care about.

* Joins an RDD of GATKReads to variant data by copying the variants files to every node, using Spark's file
* copying mechanism.
*/
public final class JoinReadsWithVariants {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little strange that you put this functionality in its own class like this. I would probably move these methods to SparkUtils or even possibly put it into a walker that lives on top of BaseRecalibratorSpark containing the logic for this traversal. At the very least if you don't want to fold this into some other class you should probably move this to the org.broadinstitute.hellbender.utils.spark package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to org.broadinstitute.hellbender.utils.spark. (BTW I put it where it was originally since the name and package echos the other *JoinReadsWithVariants classes, however we are going to remove these soon I hope.)

});
}

private static Tuple2<GATKRead, Iterable<GATKVariant>> getOverlapping(final GATKRead read, final List<FeatureDataSource<VariantContext>> variantSources) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename these methods getOverlapping? You use it for several methods here and each is doing a different thing and since they are all private they aren't a meaningful overload of eachother.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -195,11 +200,14 @@ public static void callVariantsWithHaplotypeCallerAndWriteOutput(
// Reads must be coordinate sorted to use the overlaps partitioner
final SAMFileHeader readsHeader = header.clone();
readsHeader.setSortOrder(SAMFileHeader.SortOrder.coordinate);
final JavaRDD<GATKRead> coordinateSortedReads = SparkUtils.sortReadsAccordingToHeader(reads, readsHeader, numReducers);
final JavaRDD<GATKRead> coordinateSortedReads = reads;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eliminating this sort is dangerous, if you are going to do it (and it is probably better to avoid sorting at all especially if we were doing it needlessly) you should add a check that asserts the sort order of the input and no longer set the header sort order. You could also do what MarkDuplicates spark does and perform the sort only if it is out of the expected order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I've added a check as you suggested.

final ReferenceMultiSourceAdapter referenceSource = new ReferenceMultiSourceAdapter(referenceMultiSource);
final HaplotypeCallerEngine hcEngine = new HaplotypeCallerEngine(hcArgsBroadcast.value(), false, false, header, referenceSource, annotatorEngineBroadcast.getValue());
final String pathOnExecutor = SparkFiles.get(referenceFileName);
final ReferenceSequenceFile taskReferenceSequenceFile = CachingIndexedFastaSequenceFile.checkAndCreate(IOUtils.getPath(pathOnExecutor));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this will end up opening a CachingIndexedFastaSequenceFile over the stored reference for each region? Should we expect this to have a performance impact where spark is being run on a single machine with many cores (as opposed to a cluster)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not sure if this would be strictly relevant or help performance but if the caching distance were a parameter we could save ourself some of the memory overhead by using the assembly region size as the caching size, since we aren't caching between assembly regions as far as I can tell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will open a CachingIndexedFastaSequenceFile per region. In terms of memory this is not an issue as it will use only ~1MB per instance. In terms of file contention, I haven't tried it, but I suspect that the cache itself will help (since most accesses are from cache) and using an SSD will mitigate it too, but this is something where we need to try with different cache sizes, number of executors, and threads on a single machine.

import java.util.List;
import java.util.stream.Collectors;

;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spurious ";"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

throw new IllegalArgumentException("Unrecognized known sites file extension. Must be .vcf or .vcf.gz");
}
// TODO: check existence of all files
ctx.addFile(vcfFileName);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When these files are added, a copy gets created on each worker node? So if a single machine houses 4 worker nodes then we would expect it to house 4 copies of the file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the YARN scheduler (used on Dataproc for example), each Spark executor gets a copy of the file. So if a single machine had 4 YARN containers each running a Spark executor, then yes it would have 4 copies of the file.

In Spark local mode there is a single executor, so a single copy of the file.


if (outputBam != null) { // only write output of BQSR if output BAM is specified
writeReads(ctx, outputBam, finalReads, header);
}

// Run Haplotype Caller
final ReadFilter hcReadFilter = ReadFilter.fromList(HaplotypeCallerEngine.makeStandardHCReadFilters(), header);
final JavaRDD<GATKRead> filteredReadsForHC = finalReads.filter(read -> hcReadFilter.test(read));
filteredReadsForHC.persist(StorageLevel.DISK_ONLY()); // without caching, computations are run twice as a side effect of finding partition boundaries for sorting
final JavaRDD<GATKRead> filteredReadsForHC = finalReads.filter(hcReadFilter::test);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was an empirical observation that the caching didn't help? Out of curiosity how much of a slowdown was caused by the caching here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was removed because HaplotypeCallerSpark no longer sorts, so there is no need to cache any more. I didn't measure how much of a slowdown the caching caused.

@@ -48,4 +53,36 @@ public static RecalibrationReport apply( final JavaPairRDD<GATKRead, ReadContext
final StandardCovariateList covariates = new StandardCovariateList(recalArgs, header);
return RecalUtils.createRecalibrationReport(recalArgs.generateReportTable(covariates.covariateNames()), quantizationInfo.generateReportTable(), RecalUtils.generateReportTables(combinedTables, covariates));
}

/**
* Run the {@link BaseRecalibrationEngine} on reads and overlapping variants.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the tool and its execution in reads pipeline spark have been updated to use this method, we should just delete the other apply() function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@tomwhite
Copy link
Contributor Author

Thanks for the review James.

Regarding keeping Broadcast for BQSR - I'm not sure we need to. The Spark files approach is superior for several reasons: it's a lot faster (~2x), it uses a lot less memory (MB vs GB), it works progressively (conversely, if the reference is read into memory it takes a few minutes and can't be parallelized, so can't be sped up as much due to Amdahl's law), it doesn't require conversion of fasta reference files to 2bit.

We don't know about how this performs on single multi-core machines (in Spark local mode), but I think we can examine and optimize that later.

@codecov-io
Copy link

codecov-io commented Aug 27, 2018

Codecov Report

Merging #5127 into master will decrease coverage by 0.041%.
The diff coverage is 72.662%.

@@               Coverage Diff               @@
##              master     #5127       +/-   ##
===============================================
- Coverage     86.796%   86.755%   -0.041%     
+ Complexity     29779     29766       -13     
===============================================
  Files           1822      1825        +3     
  Lines         137732    137726        -6     
  Branches       15184     15188        +4     
===============================================
- Hits          119546    119484       -62     
- Misses         12665     12721       +56     
  Partials        5521      5521
Impacted Files Coverage Δ Complexity Δ
...ender/tools/spark/transforms/ApplyBQSRSparkFn.java 80% <100%> (-3.333%) 2 <0> (-2)
...rk/pipelines/BQSRPipelineSparkIntegrationTest.java 90.476% <100%> (-2.627%) 4 <0> (-2)
...nder/tools/spark/pipelines/ReadsPipelineSpark.java 89.583% <100%> (+2.546%) 12 <4> (-5) ⬇️
...der/tools/HaplotypeCallerSparkIntegrationTest.java 61.765% <100%> (ø) 13 <2> (ø) ⬇️
.../hellbender/tools/spark/BaseRecalibratorSpark.java 93.333% <100%> (+6.377%) 6 <1> (-3) ⬇️
...ools/spark/transforms/BaseRecalibratorSparkFn.java 93.333% <100%> (-1.404%) 3 <3> (ø)
...ender/tools/spark/pipelines/BQSRPipelineSpark.java 100% <100%> (ø) 5 <1> (-3) ⬇️
...k/pipelines/ReadsPipelineSparkIntegrationTest.java 96.97% <100%> (+4.323%) 7 <0> (+1) ⬆️
...der/utils/collections/AutoCloseableCollection.java 18.75% <18.75%> (ø) 1 <1> (?)
...hellbender/utils/iterators/CloseAtEndIterator.java 37.5% <37.5%> (ø) 2 <2> (?)
... and 20 more

@jamesemery
Copy link
Collaborator

@tomwhite Like we discussed this morning, we can and should get rid of the broadcast code but we should ideally first get some sort of plot we can point to in order to justify the change. This will also be useful for future presentations of our performance improvements. The plot would ideally be a comparison between the new distribution strategy and broadcasting compared across a variable number of cores, so the performance improvement can be better understood.

@tomwhite
Copy link
Contributor Author

tomwhite commented Sep 6, 2018

Thanks @jamesemery. I have run a few benchmarks with the old and new code for comparison. The raw data looks like this:

dataset,executors,code,time
exome,10,old,24.10
exome,10,new,13.28
exome,20,old,17.03
exome,20,new,8.51
genome,20,old,94.23
genome,20,new,54.85

Here is a bar graph comparing the exome data:

gatk-5127-exome

and the genome data:

gatk-5127-genome

@jamesemery
Copy link
Collaborator

@tomwhite These graphs look good! To clarify, the improvement that these graphs are demonstrating is the new method for distributing the reference data to the nodes or is this including the other improvements? Unfortunately we still probably want to definitively show that the reference distribution is better than the current broadcast approach so we can justify removing it entirely in this PR.

@tomwhite
Copy link
Contributor Author

@jamesemery you're right, the numbers include changes to distributing both the reference data and the known sites data (VCF).

To get a better idea of the effect on reference data alone, I ran HaplotypeCallerSpark (which only uses the reference, not known sites) on a coordinate-sorted exome BAM with and without the change in this PR.

HaplotypeCallerSpark broadcast reference: 9.25 minutes
HaplotypeCallerSpark copy reference with SparkFiles: 6.44 minutes

So it looks like the new code is ~30% faster, just for the reference.

Copy link
Collaborator

@jamesemery jamesemery left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One small comment and otherwise this looks good. We should however open another PR for removing the 2bit reference support code in the engine once this goes in.

*/
protected String addReferenceFilesForSpark(JavaSparkContext ctx, String referenceFile) {
Path referencePath = IOUtils.getPath(referenceFile);
Path indexPath = ReferenceSequenceFileFactory.getFastaIndexFileName(referencePath);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume this behaves reasonably if the files are empty correct? We aren't restricting our spark files to only running in the case where there exists both a dict and a fai next door.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a file-existence check for the dict and fai. Also made the method static.

* @param knownVariants the known variant (VCF) files, can be local files or remote paths
* @return the reference file name; the absolute path of the file can be found by a Spark task using {@code SparkFiles#get()}
*/
protected List<String> addKnownSitesForSpark(JavaSparkContext ctx, List<String> knownVariants) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at every usage of this method, it has to be called immediately before JoinReadsWithVariants methods are called. I would either make this a more generic like 'addVCFsToSparkso it could be used for other purposes or I would explicitly make it part ofJoinReadsWithVariants` to avoid confusion about passing around the wrong List objects when people need to distribute vcf files. Either way it looks to me that this method could be static anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed and made static.

@jamesemery
Copy link
Collaborator

@tomwhite If you can address the one issue I had with the classes, it could also be accomplished by sterner commenting on the relevant methods and classes just so long as we are avoiding confusion somehow, then I think we can try to get this in quickly for Wednesdays release.

…atorSpark,

BQSRPipelineSpark, ReadsPipelineSpark, and to localise reference for HaplotypeCallerSpark.

As a consequence of this change, the reference can be a standard fasta file for Spark
tools (indeed 2bit is no longer supported, since htsjdk does not have support for it).

Also:
* Add AutoCloseableCollection to automatically close all objects in a collection.
* Add CloseAtEndIterator to call close on an object at the end of the iteration.
* Don't materialize all reads in memory in ApplyBQSRSparkFn.
Adjust Spark Eval script settings.
@tomwhite
Copy link
Contributor Author

tomwhite commented Oct 2, 2018

Thanks for the review @jamesemery. I've made the changes you suggested. It should be OK to merge now.

Copy link
Collaborator

@jamesemery jamesemery left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, lets merge it

@jamesemery jamesemery merged commit 2ee7df3 into master Oct 2, 2018
tomwhite added a commit that referenced this pull request Oct 8, 2018
* AddContextDataToReadSpark (and AddContextDataToReadSparkUnitTest) implemented the different JoinStrategy options for
  BQSR; has been replaced with the Spark Files mecahnism (see #5127)
* BroadcastJoinReadsWithRefBases and JoinReadsWithRefBasesSparkUnitTest were only used by AddContextDataToReadSpark
* BroadcastJoinReadsWithVariants and JoinReadsWithVariantsSparkUnitTest were only used by AddContextDataToReadSpark
* ShuffleJoinReadsWithRefBases and ShuffleJoinReadsWithVariants were only used by AddContextDataToReadSpark
* JoinStrategy was only used for BQSR (HC always uses overlaps partitioner), but is no longer used since #5127
* KnownSitesCache was replaced with Spark Files
* ReferenceMultiSourceAdapter in HaplotypeCallerSpark was replaced with the regular ReferenceDataSource
* BaseRecalibratorEngineSparkWrapper was only used by BaseRecalibratorSparkSharded, which was removed in #5192
EdwardDixon pushed a commit to EdwardDixon/gatk that referenced this pull request Nov 9, 2018
…5127)

* Use SparkFiles to localize reference and known sites for BaseRecalibratorSpark,
BQSRPipelineSpark, ReadsPipelineSpark, and to localise reference for HaplotypeCallerSpark.

As a consequence of this change, the reference can be a standard fasta file for Spark
tools (indeed 2bit is no longer supported, since htsjdk does not have support for it).

Also:
* Add AutoCloseableCollection to automatically close all objects in a collection.
* Add CloseAtEndIterator to call close on an object at the end of the iteration.
* Don't materialize all reads in memory in ApplyBQSRSparkFn.
jamesemery pushed a commit that referenced this pull request Jan 11, 2019
* AddContextDataToReadSpark (and AddContextDataToReadSparkUnitTest) implemented the different JoinStrategy options for
  BQSR; has been replaced with the Spark Files mecahnism (see #5127)
* BroadcastJoinReadsWithRefBases and JoinReadsWithRefBasesSparkUnitTest were only used by AddContextDataToReadSpark
* BroadcastJoinReadsWithVariants and JoinReadsWithVariantsSparkUnitTest were only used by AddContextDataToReadSpark
* ShuffleJoinReadsWithRefBases and ShuffleJoinReadsWithVariants were only used by AddContextDataToReadSpark
* JoinStrategy was only used for BQSR (HC always uses overlaps partitioner), but is no longer used since #5127
* KnownSitesCache was replaced with Spark Files
* ReferenceMultiSourceAdapter in HaplotypeCallerSpark was replaced with the regular ReferenceDataSource
* BaseRecalibratorEngineSparkWrapper was only used by BaseRecalibratorSparkSharded, which was removed in #5192
jamesemery pushed a commit that referenced this pull request Jan 11, 2019
* AddContextDataToReadSpark (and AddContextDataToReadSparkUnitTest) implemented the different JoinStrategy options for
  BQSR; has been replaced with the Spark Files mecahnism (see #5127)
* BroadcastJoinReadsWithRefBases and JoinReadsWithRefBasesSparkUnitTest were only used by AddContextDataToReadSpark
* BroadcastJoinReadsWithVariants and JoinReadsWithVariantsSparkUnitTest were only used by AddContextDataToReadSpark
* ShuffleJoinReadsWithRefBases and ShuffleJoinReadsWithVariants were only used by AddContextDataToReadSpark
* JoinStrategy was only used for BQSR (HC always uses overlaps partitioner), but is no longer used since #5127
* KnownSitesCache was replaced with Spark Files
* ReferenceMultiSourceAdapter in HaplotypeCallerSpark was replaced with the regular ReferenceDataSource
* BaseRecalibratorEngineSparkWrapper was only used by BaseRecalibratorSparkSharded, which was removed in #5192
@davidbenjamin davidbenjamin deleted the tw_spark_files_optimization branch February 24, 2021 15:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants