Skip to content

Commit

Permalink
Don't reuse the downsampler between shards since it retains state.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Nov 20, 2018
1 parent 2cad93d commit e4bd5da
Showing 1 changed file with 8 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,15 @@ private static FlatMapFunction<Iterator<Shard<GATKRead>>, ActivityProfileStateRa
ReferenceDataSource reference = referenceFileName == null ? null : new ReferenceFileSource(IOUtils.getPath(SparkFiles.get(referenceFileName)));
final FeatureManager features = bFeatureManager == null ? null : bFeatureManager.getValue();
AssemblyRegionEvaluator assemblyRegionEvaluator = supplierBroadcast.getValue().get(); // one AssemblyRegionEvaluator instance per Spark partition
final ReadsDownsampler readsDownsampler = assemblyRegionArgs.maxReadsPerAlignmentStart > 0 ?
new PositionalDownsampler(assemblyRegionArgs.maxReadsPerAlignmentStart, header) : null;


return Utils.stream(shardedReadIterator)
.map(shardedRead -> new ShardToMultiIntervalShardAdapter<>(shardedRead))
// TODO: reinstate downsampling (not yet working)
// new DownsampleableSparkReadShard(
// new ShardBoundary(shardedRead.getInterval(), shardedRead.getPaddedInterval()), shardedRead, readsDownsampler)))
.map(shardedRead -> {
final ReadsDownsampler readsDownsampler = assemblyRegionArgs.maxReadsPerAlignmentStart > 0 ?
new PositionalDownsampler(assemblyRegionArgs.maxReadsPerAlignmentStart, header) : null;
return new ShardToMultiIntervalShardAdapter<>(
new DownsampleableSparkReadShard(
new ShardBoundary(shardedRead.getInterval(), shardedRead.getPaddedInterval()), shardedRead, readsDownsampler));
})
.map(shardedRead -> {
final Iterator<ActivityProfileState> activityProfileStateIter = new ActivityProfileStateIterator(
new ShardToMultiIntervalShardAdapter<>(shardedRead),
Expand Down

0 comments on commit e4bd5da

Please sign in to comment.